Skip to content

Commit

Permalink
Refactor async timeouts
Browse files Browse the repository at this point in the history
Move async timeout thread to Protocol
Move tracking of timeout setting to Processor
Move tracking of last asycn start to AsyncStateMachine

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1709578 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Oct 20, 2015
1 parent 10239c7 commit a6c64ef
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 227 deletions.
34 changes: 34 additions & 0 deletions java/org/apache/coyote/AbstractProcessor.java
Expand Up @@ -40,6 +40,7 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement

protected Adapter adapter;
protected final AsyncStateMachine asyncStateMachine;
private volatile long asyncTimeout = -1;
protected final AbstractEndpoint<?> endpoint;
protected final Request request;
protected final Response response;
Expand Down Expand Up @@ -242,6 +243,39 @@ protected void dispatchNonBlockingRead() {
}


@Override
public void timeoutAsync(long now) {
if (now < 0) {
doTimeoutAsync();
} else {
long asyncTimeout = getAsyncTimeout();
if (asyncTimeout > 0) {
long asyncStart = asyncStateMachine.getLastAsyncStart();
if ((now - asyncStart) > asyncTimeout) {
doTimeoutAsync();
}
}
}
}


private void doTimeoutAsync() {
// Avoid multiple timeouts
setAsyncTimeout(-1);
socketWrapper.processSocket(SocketStatus.TIMEOUT, true);
}


public void setAsyncTimeout(long timeout) {
asyncTimeout = timeout;
}


public long getAsyncTimeout() {
return asyncTimeout;
}


@Override
public void recycle() {
errorState = ErrorState.NONE;
Expand Down
101 changes: 95 additions & 6 deletions java/org/apache/coyote/AbstractProtocol.java
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -86,9 +88,20 @@ public abstract class AbstractProtocol<S> implements ProtocolHandler,
*/
private final AbstractEndpoint<S> endpoint;


private Handler<S> handler;


private final Set<Processor> waitingProcessors =
Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());


/**
* The async timeout thread.
*/
private AsyncTimeout asyncTimeout = null;


public AbstractProtocol(AbstractEndpoint<S> endpoint) {
this.endpoint = endpoint;
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
Expand Down Expand Up @@ -182,6 +195,11 @@ public boolean isSendfileSupported() {
}


public AsyncTimeout getAsyncTimeout() {
return asyncTimeout;
}


// ---------------------- Properties that are passed through to the EndPoint

@Override
Expand Down Expand Up @@ -338,6 +356,16 @@ public String getName() {
}


public void addWaitingProcessor(Processor processor) {
waitingProcessors.add(processor);
}


public void removeWaitingProcessor(Processor processor) {
waitingProcessors.remove(processor);
}


// ----------------------------------------------- Accessors for sub-classes

protected AbstractEndpoint<S> getEndpoint() {
Expand Down Expand Up @@ -514,6 +542,14 @@ public void start() throws Exception {
getName()), ex);
throw ex;
}


// Start async timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getName() + "-AsyncTimeout");
timeoutThread.setPriority(endpoint.getThreadPriority());
timeoutThread.setDaemon(true);
timeoutThread.start();
}


Expand Down Expand Up @@ -551,6 +587,9 @@ public void stop() throws Exception {
if(getLog().isInfoEnabled())
getLog().info(sm.getString("abstractProtocolHandler.stop",
getName()));

asyncTimeout.stop();

try {
endpoint.stop();
} catch (Exception ex) {
Expand Down Expand Up @@ -648,7 +687,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketStatus status) {
return SocketState.CLOSED;
}

wrapper.setAsync(false);
ContainerThreadMarker.set();

try {
Expand Down Expand Up @@ -684,6 +722,8 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketStatus status) {

// Associate the processor with the connection
connections.put(socket, processor);
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);

SocketState state = SocketState.CLOSED;
do {
Expand Down Expand Up @@ -719,6 +759,9 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketStatus status) {
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
Expand Down Expand Up @@ -791,11 +834,8 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketStatus status) {


protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
if (processor.isAsync()) {
// Async
socket.setAsync(true);
} else {
// This branch is currently only used with HTTP
if (!processor.isAsync()) {
// This is currently only used with HTTP
// Either:
// - this is an upgraded connection
// - the request line/headers have not been completely
Expand Down Expand Up @@ -964,4 +1004,53 @@ public synchronized void clear() {
size.set(0);
}
}


/**
* Async timeout thread
*/
protected class AsyncTimeout implements Runnable {

private volatile boolean asyncTimeoutRunning = true;

/**
* The background thread that checks async requests and fires the
* timeout if there has been no activity.
*/
@Override
public void run() {

// Loop until we receive a shutdown command
while (asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
long now = System.currentTimeMillis();
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(now);
}

// Loop if endpoint is paused
while (endpoint.isPaused() && asyncTimeoutRunning) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}


protected void stop() {
asyncTimeoutRunning = false;

// Timeout any pending async request
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(-1);
}
}
}
}
13 changes: 13 additions & 0 deletions java/org/apache/coyote/AsyncStateMachine.java
Expand Up @@ -154,6 +154,7 @@ public boolean getPauseNonContainerThread() {


private volatile AsyncState state = AsyncState.DISPATCHED;
private volatile long lastAsyncStart = 0;
// Need this to fire listener on complete
private AsyncContextCallback asyncCtxt = null;
private final AbstractProcessor processor;
Expand Down Expand Up @@ -188,10 +189,22 @@ public boolean isCompleting() {
return state.isCompleting();
}

/**
* Obtain the time that this connection last transitioned to async
* processing.
*
* @return The time (as returned by {@link System#currentTimeMillis()}) that
* this connection last transitioned to async
*/
public long getLastAsyncStart() {
return lastAsyncStart;
}

public synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
if (state == AsyncState.DISPATCHED) {
state = AsyncState.STARTING;
this.asyncCtxt = asyncCtxt;
lastAsyncStart = System.currentTimeMillis();
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState",
Expand Down
13 changes: 12 additions & 1 deletion java/org/apache/coyote/Processor.java
Expand Up @@ -49,8 +49,19 @@ public interface Processor {

HttpUpgradeHandler getHttpUpgradeHandler();

boolean isAsync();
boolean isUpgrade();
boolean isAsync();

/**
* Check this processor to see if the async timeout has expired and process
* a timeout if that is that case.
*
* @param now The time (as returned by {@link System#currentTimeMillis()} to
* use as the current time to determine whether the async timeout
* has expired. If negative, the timeout will always be treated
* as if it has expired.
*/
void timeoutAsync(long now);

Request getRequest();

Expand Down
2 changes: 1 addition & 1 deletion java/org/apache/coyote/ajp/AjpProcessor.java
Expand Up @@ -517,7 +517,7 @@ public final void action(ActionCode actionCode, Object param) {
case ASYNC_SETTIMEOUT: {
if (param == null) return;
long timeout = ((Long)param).longValue();
socketWrapper.setAsyncTimeout(timeout);
setAsyncTimeout(timeout);
break;
}
case ASYNC_TIMEOUT: {
Expand Down
2 changes: 1 addition & 1 deletion java/org/apache/coyote/http11/Http11Processor.java
Expand Up @@ -742,7 +742,7 @@ public final void action(ActionCode actionCode, Object param) {
return;
}
long timeout = ((Long) param).longValue();
socketWrapper.setAsyncTimeout(timeout);
setAsyncTimeout(timeout);
break;
}
case ASYNC_DISPATCH: {
Expand Down
Expand Up @@ -95,4 +95,10 @@ public final Request getRequest() {
public ByteBuffer getLeftoverInput() {
return null;
}


@Override
public void timeoutAsync(long now) {
// NO-OP
}
}

0 comments on commit a6c64ef

Please sign in to comment.