Skip to content

Commit

Permalink
[GEOS-8613] Add support for priority in control-flow bounded queues
Browse files Browse the repository at this point in the history
  • Loading branch information
aaime committed Mar 16, 2018
1 parent f8c24cc commit a56017b
Show file tree
Hide file tree
Showing 18 changed files with 707 additions and 79 deletions.
18 changes: 18 additions & 0 deletions doc/en/user/source/extensions/controlflow/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ A few examples::
ows.wms.getmap=8
# don't allow more than 2 WFS GetFeature requests with Excel output format
ows.wfs.getfeature.application/msexcel=2
Request priority support
........................

Requests controlled by "ows.*" controllers above can be also executed in priority order, in case there are too many
the request will block and wait, and will we awoken in priority order (highest to lowest).

Currently the only way to specific a priority for a request is to add it to a request HTTP header::

ows.priority.http=<headerName>,<defaultPriority>
The header "headerName" will contain a number defining the priority for the request, the default priority is used
as a fallback if/when the header is not found.

Using a header implies some other system is involved in the priority management. This is particulary good when using
a load balancer, as the requests priorities need to be evenly split across cluster elements, control-flow only
has visibility of a single instance. As an example, the priority will be de-facto ignored at the cluster level
if there are two nodes, and for whatever chance or design, the high priority requests end up converging on the same cluster node.

Per user concurrency control
............................
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import org.geotools.util.logging.Logging;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
Expand Down Expand Up @@ -76,7 +71,9 @@ public CallbackContext(Request request, List<FlowController> controllers, long t

}

static ThreadLocal<CallbackContext> REQUEST_CONTROLLERS = new ThreadLocal<CallbackContext>();
static ThreadLocal<CallbackContext> REQUEST_CONTROLLERS = new ThreadLocal<>();

static ThreadLocal<Boolean> FAILED_ON_FLOW_CONTROLLERS = new ThreadLocal<>();

FlowControllerProvider provider;

Expand Down Expand Up @@ -120,6 +117,7 @@ public Operation operationDispatched(Request request, Operation operation) {

blockedRequests.incrementAndGet();
long start = System.currentTimeMillis();
boolean failedOnFlowControllers = true;
try {
// the operation has not been set in the Request yet by the dispatcher, do so now in
// a clone of the Request
Expand Down Expand Up @@ -167,12 +165,17 @@ public Operation operationDispatched(Request request, Operation operation) {
}
}
}
failedOnFlowControllers = false;
} finally {
blockedRequests.decrementAndGet();
runningRequests.incrementAndGet();
if (!failedOnFlowControllers) {
runningRequests.incrementAndGet();
}
FAILED_ON_FLOW_CONTROLLERS.set(failedOnFlowControllers);

if(REQUEST_CONTROLLERS.get() != null) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Request started, running requests: " + getRunningRequests() + ", blocked requests: "
LOGGER.info("Request control-flow performed, running requests: " + getRunningRequests() + ", blocked requests: "
+ getBlockedRequests());
}
}
Expand Down Expand Up @@ -258,7 +261,9 @@ private void releaseControllers(boolean forceRelease) {
if (context != null) {
context.nestingLevel--;
if(context.nestingLevel <= 0 || forceRelease) {
runningRequests.decrementAndGet();
if (Boolean.FALSE.equals(FAILED_ON_FLOW_CONTROLLERS.get())) {
runningRequests.decrementAndGet();
}
// call back the same controllers we used when the operation started, releasing
// them in inverse order
LOGGER.info("releasing flow controllers for [" + context.request + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ public interface FlowController {
* @return True if the request was processed successfully, false if the request timed out during
* the wait
*/
public boolean requestIncoming(Request request, long timeout);
boolean requestIncoming(Request request, long timeout);

/**
* Called when the request is done its processing
* Called when the request is done its processing (will be called both for executing and timeout out requests
* to ensure eventually required clean ups)
*
* @param request the request
*/
public void requestComplete(Request request);
void requestComplete(Request request);

/**
* Returns the flow controller priority. Lower numbers mean higher priority. For flow
* controllers that limit the number of incoming requests by using a blocing queue it is advised
* Returns the flow controller "priority", determines the order in which the controllers are being called,
* from lower to higher (not to be confused with the request priority).
* For controllers that limit the number of incoming requests by using a blocking queue it is advised
* to use the queue size itself as the controller priority.
*
*
*/
public int getPriority();
int getPriority();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,23 @@
*/
package org.geoserver.flow.config;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.geoserver.config.GeoServerPluginConfigurator;
import org.geoserver.flow.ControlFlowConfigurator;
import org.geoserver.flow.FlowController;
import org.geoserver.flow.controller.BasicOWSController;
import org.geoserver.flow.controller.CookieKeyGenerator;
import org.geoserver.flow.controller.GlobalFlowController;
import org.geoserver.flow.controller.HttpHeaderPriorityProvider;
import org.geoserver.flow.controller.IpFlowController;
import org.geoserver.flow.controller.IpKeyGenerator;
import org.geoserver.flow.controller.KeyGenerator;
import org.geoserver.flow.controller.OWSRequestMatcher;
import org.geoserver.flow.controller.PriorityProvider;
import org.geoserver.flow.controller.PriorityThreadBlocker;
import org.geoserver.flow.controller.RateFlowController;
import org.geoserver.flow.controller.SimpleThreadBlocker;
import org.geoserver.flow.controller.SingleIpFlowController;
import org.geoserver.flow.controller.ThreadBlocker;
import org.geoserver.flow.controller.UserConcurrentFlowController;
import org.geoserver.platform.GeoServerExtensions;
import org.geoserver.platform.GeoServerResourceLoader;
Expand All @@ -38,6 +32,17 @@
import org.geoserver.security.PropertyFileWatcher;
import org.geotools.util.logging.Logging;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Basic property file based {@link ControlFlowConfigurator} implementation
*
Expand Down Expand Up @@ -107,19 +112,20 @@ public List<FlowController> buildFlowControllers() throws Exception {
timeout = -1;

Properties p = configFile.getProperties();
List<FlowController> newControllers = new ArrayList<FlowController>();
List<FlowController> newControllers = new ArrayList<>();
PriorityProvider priorityProvider = getPriorityProvider(p);

for (Object okey : p.keySet()) {
String key = ((String) okey).trim();
String value = (String) p.get(okey);
LOGGER.info("Loading control-flow configuration: " + key + "=" + value);

String[] keys = key.split("\\s*\\.\\s*");

int queueSize = 0;
StringTokenizer tokenizer = new StringTokenizer(value, ",");
try {
// some properties are not integers
if("ip.blacklist".equals(key) || "ip.whitelist".equals(key)) {
if("ip.blacklist".equals(key) || "ip.whitelist".equals(key) || "ows.priority.http".equals(key)) {
continue;
} else {
if (!key.startsWith("user.ows") && !key.startsWith("ip.ows")) {
Expand All @@ -142,15 +148,16 @@ public List<FlowController> buildFlowControllers() throws Exception {
continue;
}
if ("ows.global".equalsIgnoreCase(key)) {
controller = new GlobalFlowController(queueSize);
controller = new GlobalFlowController(queueSize, buildBlocker(queueSize, priorityProvider));
} else if ("ows".equals(keys[0])) {
// todo: check, if possible, if the service, method and output format actually exist
ThreadBlocker threadBlocker = buildBlocker(queueSize, priorityProvider);
if (keys.length >= 4) {
controller = new BasicOWSController(keys[1], keys[2], keys[3], queueSize);
controller = new BasicOWSController(keys[1], keys[2], keys[3], queueSize, threadBlocker);
} else if (keys.length == 3) {
controller = new BasicOWSController(keys[1], keys[2], queueSize);
controller = new BasicOWSController(keys[1], keys[2], queueSize, threadBlocker);
} else if (keys.length == 2) {
controller = new BasicOWSController(keys[1], queueSize);
controller = new BasicOWSController(keys[1], queueSize, threadBlocker);
}
} else if ("user".equals(keys[0])) {
if (keys.length == 1) {
Expand Down Expand Up @@ -178,22 +185,75 @@ protected KeyGenerator buildKeyGenerator(String[] keys, String value) {

}.build(keys, value);
} else if (keys.length > 1) {
if(!"blacklist".equals(keys[1]) && !"whitelist".equals(keys[1])){
String ip = key.substring("ip.".length());
controller = new SingleIpFlowController(queueSize, ip);
}
if (!"blacklist".equals(keys[1]) && !"whitelist".equals(keys[1])) {
String ip = key.substring("ip.".length());
controller = new SingleIpFlowController(queueSize, ip);
}
}
}

if (controller == null) {
LOGGER.severe("Could not parse rule '" + okey + "=" + value);
LOGGER.severe("Could not parse control-flow rule: '" + okey + "=" + value);
} else {
LOGGER.info("Loaded control-flow rule: " + key + "=" + value);
newControllers.add(controller);
}
}

return newControllers;
}

/**
* Parses the configuration for priority providers
*
* @param p the configuration properties
* @return A {@link PriorityProvider} or null if no (valid) configuration was found
*/
private PriorityProvider getPriorityProvider(Properties p) {
for (Object okey : p.keySet()) {
String key = ((String) okey).trim();
String value = (String) p.get(okey);

// is it a priority specification?
if ("ows.priority.http".equals(key)) {
String error = "";
try {
String[] splitValue = value.split("\\s*,\\s*");
if (splitValue.length == 2 && splitValue[0].length() > 0) {
String httpHeaderName = splitValue[0];
int defaultPriority = Integer.parseInt(splitValue[1]);

LOGGER.info("Found OWS priority specification " + key + "=" + value);
return new HttpHeaderPriorityProvider(httpHeaderName, defaultPriority);
}
} catch (NumberFormatException e) {
error = " " + e.getMessage();
}

LOGGER.severe("Unexpected priority specification found '" + value + "', " +
"the expected format is headerName,defaultPriorityValue." + error);
}


}
return null;
}

/**
* Builds a {@link ThreadBlocker} based on a queue size and a prority provider
* @param queueSize The count of concurrent requests allowed to run
* @param priorityProvider The priority provider (if not null, a
* {@link org.geoserver.flow.controller.PriorityThreadBlocker} will be built
* @return a {@link ThreadBlocker}
*/
private ThreadBlocker buildBlocker(int queueSize, PriorityProvider priorityProvider) {
if (priorityProvider != null) {
return new PriorityThreadBlocker(queueSize, priorityProvider);
} else {
return new SimpleThreadBlocker(queueSize);
}
}

public boolean isStale() {
return configFile.isStale();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
*
*/
public class BasicOWSController extends SingleQueueFlowController {
public BasicOWSController(String service, int queueSize) {
this(service, null, null, queueSize);

public BasicOWSController(String service, int controllerPriority, ThreadBlocker blocker) {
this(service, null, null, controllerPriority, blocker);
}

public BasicOWSController(String service, String method, int queueSize) {
this(service, method, null, queueSize);
public BasicOWSController(String service, String method, int controllerPriority, ThreadBlocker blocker) {
this(service, method, null, controllerPriority, blocker);
}

public BasicOWSController(String service, String method, String outputFormat, int queueSize) {
super(queueSize, new OWSRequestMatcher(service, method, outputFormat));
public BasicOWSController(String service, String method, String outputFormat, int controllerPriority, ThreadBlocker blocker) {
super(new OWSRequestMatcher(service, method, outputFormat), controllerPriority, blocker);
}

@Override
public String toString() {
return "BasicOWSController(" + matcher + "," + queueSize + ")";
return "BasicOWSController(" + matcher + "," + blocker + ")";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
*/
public class GlobalFlowController extends SingleQueueFlowController {

public GlobalFlowController(int queueSize) {
super(queueSize, new OWSRequestMatcher());
public GlobalFlowController(int controllerPriority, ThreadBlocker blocker) {
super(new OWSRequestMatcher(), controllerPriority, blocker);
}

@Override
public String toString() {
return "GlobalFlowController(" + queueSize + ")";
return "GlobalFlowController(" + blocker + ")";
}

}

0 comments on commit a56017b

Please sign in to comment.