Skip to content

Commit

Permalink
Merged branch 'jetty-9.1.x' into 'master'.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbordet committed Apr 11, 2014
2 parents f2f5353 + 5eeda38 commit 3717115
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 68 deletions.
Expand Up @@ -215,6 +215,10 @@ public void close()
LOG.debug("Closed {}", this);
}

public void release(Connection connection)
{
}

public void close(Connection connection)
{
}
Expand Down
Expand Up @@ -140,8 +140,11 @@ public void run()

protected abstract void send(C connection, HttpExchange exchange);

public void release(C connection)
@Override
public void release(Connection c)
{
@SuppressWarnings("unchecked")
C connection = (C)c;
LOG.debug("{} released", connection);
HttpClient client = getHttpClient();
if (client.isRunning())
Expand Down
Expand Up @@ -28,8 +28,6 @@
import org.eclipse.jetty.fcgi.generator.Generator;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.IdleTimeout;

Expand Down Expand Up @@ -83,55 +81,54 @@ public boolean abort(Throwable cause)
return receiver.abort(cause);
}

protected void responseBegin(int code, String reason)
protected boolean responseBegin(int code, String reason)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
exchange.getResponse().version(version).status(code).reason(reason);
receiver.responseBegin(exchange);
}
if (exchange == null)
return false;
exchange.getResponse().version(version).status(code).reason(reason);
return receiver.responseBegin(exchange);
}

protected void responseHeader(HttpField field)
protected boolean responseHeader(HttpField field)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseHeader(exchange, field);
return exchange != null && receiver.responseHeader(exchange, field);
}

protected void responseHeaders()
protected boolean responseHeaders()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseHeaders(exchange);
return exchange != null && receiver.responseHeaders(exchange);
}

protected void content(ByteBuffer buffer)
protected boolean content(ByteBuffer buffer)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseContent(exchange, buffer);
return exchange != null && receiver.responseContent(exchange, buffer);
}

protected void responseSuccess()
protected boolean responseSuccess()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseSuccess(exchange);
return exchange != null && receiver.responseSuccess(exchange);
}

protected boolean responseFailure(Throwable failure)
{
HttpExchange exchange = getHttpExchange();
return exchange != null && receiver.responseFailure(failure);
}

@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
idle.onClose();
boolean close = result.isFailed();
HttpFields responseHeaders = result.getResponse().getHeaders();
close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
if (close)
connection.close();
else
if (result.isFailed())
connection.close(result.getFailure());
else if (!connection.closeByHTTP(responseHeaders))
connection.release(this);
}

Expand All @@ -154,7 +151,8 @@ public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout)
@Override
protected void onIdleExpired(TimeoutException timeout)
{
LOG.debug("Idle timeout for request {}", request);
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout for request {}", request);
connection.abort(timeout);
}

Expand Down
Expand Up @@ -69,7 +69,7 @@ public HttpDestination newHttpDestination(Origin origin)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination);
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, isMultiplexed());
LOG.debug("Created {}", connection);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Expand Down
Expand Up @@ -31,14 +31,16 @@
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.parser.ClientParser;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
Expand All @@ -55,14 +57,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private final AtomicBoolean closed = new AtomicBoolean();
private final Flusher flusher;
private final HttpDestination destination;
private final boolean multiplexed;
private final Delegate delegate;
private final ClientParser parser;

public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination)
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
{
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
this.flusher = new Flusher(endPoint);
this.destination = destination;
this.multiplexed = multiplexed;
this.flusher = new Flusher(endPoint);
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
Expand Down Expand Up @@ -103,7 +107,7 @@ public void onFillable()
while (true)
{
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
Expand All @@ -124,7 +128,7 @@ else if (read == 0)
catch (Exception x)
{
LOG.debug(x);
// TODO: fail and close ?
close(x);
}
finally
{
Expand All @@ -140,7 +144,12 @@ private void parse(ByteBuffer buffer)

private void shutdown()
{
close(new EOFException());
// Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses.
if (channels.isEmpty())
close();
else
failAndClose(new EOFException());
}

@Override
Expand All @@ -153,13 +162,7 @@ protected boolean onReadTimeout()
protected void release(HttpChannelOverFCGI channel)
{
channels.remove(channel.getRequest());
if (destination instanceof PoolingHttpDestination)
{
@SuppressWarnings("unchecked")
PoolingHttpDestination<HttpConnectionOverFCGI> fcgiDestination =
(PoolingHttpDestination<HttpConnectionOverFCGI>)destination;
fcgiDestination.release(this);
}
destination.release(this);
}

@Override
Expand All @@ -168,7 +171,7 @@ public void close()
close(new AsynchronousCloseException());
}

private void close(Throwable failure)
protected void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
Expand All @@ -184,6 +187,16 @@ private void close(Throwable failure)
}
}

protected boolean closeByHTTP(HttpFields fields)
{
if (multiplexed)
return false;
if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
return false;
close();
return true;
}

protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : channels.values())
Expand All @@ -195,6 +208,15 @@ protected void abort(Throwable failure)
channels.clear();
}

private void failAndClose(Throwable failure)
{
boolean result = false;
for (HttpChannelOverFCGI channel : channels.values())
result |= channel.responseFailure(failure);
if (result)
close(failure);
}

private int acquireRequest()
{
synchronized (requests)
Expand Down Expand Up @@ -322,8 +344,23 @@ public void onEnd(int request)
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
channel.responseSuccess();
releaseRequest(request);
if (channel.responseSuccess())
releaseRequest(request);
}
else
{
noChannel(request);
}
}

@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
{
if (channel.responseFailure(failure))
releaseRequest(request);
}
else
{
Expand Down
Expand Up @@ -88,21 +88,36 @@ public Result generateResponseHeaders(int request, int code, String reason, Http
return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT);
}

public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, Callback callback)
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, boolean aborted, Callback callback)
{
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
if (lastContent)
if (aborted)
{
// Generate the FCGI_END_REQUEST
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putLong(0x00L);
endRequestBuffer.flip();
result = result.append(endRequestBuffer, true);
Result result = new Result(byteBufferPool, callback);
if (lastContent)
result.append(generateEndRequest(request, true), true);
else
result.append(BufferUtil.EMPTY_BUFFER, false);
return result;
}
return result;
else
{
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
if (lastContent)
result.append(generateEndRequest(request, false), true);
return result;
}
}

private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putInt(aborted ? 1 : 0);
endRequestBuffer.putInt(0);
endRequestBuffer.flip();
return endRequestBuffer;
}
}
Expand Up @@ -98,5 +98,13 @@ public void onEnd(int request)
for (StreamContentParser streamParser : streamParsers)
streamParser.end(request);
}

@Override
public void onFailure(int request, Throwable failure)
{
listener.onFailure(request, failure);
for (StreamContentParser streamParser : streamParsers)
streamParser.end(request);
}
}
}
Expand Up @@ -107,8 +107,12 @@ public boolean parse(ByteBuffer buffer)

private void onEnd()
{
// TODO: if protocol != 0, invoke an error callback
listener.onEnd(getRequest());
if (application != 0)
listener.onFailure(getRequest(), new Exception("FastCGI application returned code " + application));
else if (protocol != 0)
listener.onFailure(getRequest(), new Exception("FastCGI server returned code " + protocol));
else
listener.onEnd(getRequest());
}

private void reset()
Expand Down
Expand Up @@ -100,6 +100,8 @@ public interface Listener

public void onEnd(int request);

public void onFailure(int request, Throwable failure);

public static class Adapter implements Listener
{
@Override
Expand All @@ -121,6 +123,12 @@ public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
public void onEnd(int request)
{
}

@Override
public void onFailure(int request, Throwable failure)
{

}
}
}

Expand Down

0 comments on commit 3717115

Please sign in to comment.