Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty: Add HTTP pipelining support #8299

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/reference/modules/http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ when connecting for better performance and try to get your favorite
client not to do
http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking].

IMPORTANT: HTTP pipelining is not supported and should be disabled in your HTTP client.

[float]
=== Settings

Expand Down Expand Up @@ -69,6 +67,9 @@ be cached for. Defaults to `1728000` (20 days)
header should be returned. Note: This header is only returned, when the setting is
set to `true`. Defaults to `false`

|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.

|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.

|=======================================================================

Expand Down
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1406,17 +1406,18 @@
<include>src/test/java/org/elasticsearch/**/*.java</include>
</includes>
<excludes>
<exclude>src/main/java/org/elasticsearch/common/inject/**</exclude>
<!-- Guice -->
<exclude>src/main/java/org/elasticsearch/common/inject/**</exclude>
<exclude>src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java</exclude>
<exclude>src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java</exclude>
<exclude>src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java</exclude>
<exclude>src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java</exclude>
<exclude>src/main/java/org/apache/lucene/**/X*.java</exclude>
<!-- t-digest -->
<exclude>src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
</exclude>
<exclude>src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java</exclude>
<exclude>src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java</exclude>
<!-- netty pipelining -->
<exclude>src/main/java/org/elasticsearch/http/netty/pipelining/**</exclude>
</excludes>
</configuration>
<executions>
Expand Down
19 changes: 17 additions & 2 deletions src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.http.netty;

import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.HttpRequest;
Expand All @@ -34,19 +35,33 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {

private final NettyHttpServerTransport serverTransport;
private final Pattern corsPattern;
private final boolean httpPipeliningEnabled;

public HttpRequestHandler(NettyHttpServerTransport serverTransport) {
this.serverTransport = serverTransport;
this.corsPattern = RestUtils.getCorsSettingRegex(serverTransport.settings());
this.httpPipeliningEnabled = serverTransport.pipelining;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
HttpRequest request;
OrderedUpstreamMessageEvent oue = null;
if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) {
oue = (OrderedUpstreamMessageEvent) e;
request = (HttpRequest) oue.getMessage();
} else {
request = (HttpRequest) e.getMessage();
}

// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest, corsPattern));
if (oue != null) {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern, oue));
} else {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern));
}
super.messageReceived(ctx, e);
}

Expand Down
29 changes: 23 additions & 6 deletions src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.*;

import java.util.List;
Expand All @@ -61,16 +61,22 @@ public class NettyHttpChannel extends HttpChannel {
private final NettyHttpServerTransport transport;
private final Channel channel;
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null;
private Pattern corsPattern;

public NettyHttpChannel(NettyHttpServerTransport transport, Channel channel, NettyHttpRequest request, Pattern corsPattern) {
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern) {
super(request);
this.transport = transport;
this.channel = channel;
this.channel = request.getChannel();
this.nettyRequest = request.request();
this.corsPattern = corsPattern;
}

public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern, OrderedUpstreamMessageEvent orderedUpstreamMessageEvent) {
this(transport, request, corsPattern);
this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent;
}

@Override
public BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays);
Expand Down Expand Up @@ -185,14 +191,25 @@ public void sendResponse(RestResponse response) {
}
}

ChannelFuture future = channel.write(resp);
ChannelFuture future;

if (orderedUpstreamMessageEvent != null) {
OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp);
future = downstreamChannelEvent.getFuture();
channel.getPipeline().sendDownstream(downstreamChannelEvent);
} else {
future = channel.write(resp);
}

if (response.contentThreadSafe() && content instanceof Releasable) {
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
addedReleaseListener = true;
}

if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}

} finally {
if (!addedReleaseListener && content instanceof Releasable) {
((Releasable) content).close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public SocketAddress getLocalAddress() {
return channel.getLocalAddress();
}

public Channel getChannel() {
return channel;
}

@Override
public String header(String name) {
return request.headers().get(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.http.*;
import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.transport.BindTransportException;
import org.jboss.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -72,6 +73,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
public static final String SETTING_CORS_ALLOW_METHODS = "http.cors.allow-methods";
public static final String SETTING_CORS_ALLOW_HEADERS = "http.cors.allow-headers";
public static final String SETTING_CORS_ALLOW_CREDENTIALS = "http.cors.allow-credentials";
public static final String SETTING_PIPELINING = "http.pipelining";
public static final String SETTING_PIPELINING_MAX_EVENTS = "http.pipelining.max_events";
public static final String SETTING_HTTP_COMPRESSION = "http.compression";
public static final String SETTING_HTTP_COMPRESSION_LEVEL = "http.compression_level";

public static final boolean DEFAULT_SETTING_PIPELINING = true;
public static final int DEFAULT_SETTING_PIPELINING_MAX_EVENTS = 10000;

private final NetworkService networkService;
final BigArrays bigArrays;
Expand All @@ -85,6 +93,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer

private final boolean blockingServer;

final boolean pipelining;

private final int pipeliningMaxEvents;

final boolean compression;

private final int compressionLevel;
Expand Down Expand Up @@ -164,8 +176,10 @@ public NettyHttpServerTransport(Settings settings, NetworkService networkService
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}

this.compression = settings.getAsBoolean("http.compression", false);
this.compressionLevel = settings.getAsInt("http.compression_level", 6);
this.compression = settings.getAsBoolean(SETTING_HTTP_COMPRESSION, false);
this.compressionLevel = settings.getAsInt(SETTING_HTTP_COMPRESSION_LEVEL, 6);
this.pipelining = settings.getAsBoolean(SETTING_PIPELINING, DEFAULT_SETTING_PIPELINING);
this.pipeliningMaxEvents = settings.getAsInt(SETTING_PIPELINING_MAX_EVENTS, DEFAULT_SETTING_PIPELINING_MAX_EVENTS);

// validate max content length
if (maxContentLength.bytes() > Integer.MAX_VALUE) {
Expand All @@ -174,8 +188,8 @@ public NettyHttpServerTransport(Settings settings, NetworkService networkService
}
this.maxContentLength = maxContentLength;

logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax);
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents);
}

public Settings settings() {
Expand Down Expand Up @@ -370,6 +384,9 @@ public ChannelPipeline getPipeline() throws Exception {
if (transport.compression) {
pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
}
if (transport.pipelining) {
pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
}
pipeline.addLast("handler", requestHandler);
return pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.elasticsearch.http.netty.pipelining;

import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequest;

import java.util.*;

/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
* corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will
* cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely
* OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects.
*
* @author Christopher Hunt
*/
public class HttpPipeliningHandler extends SimpleChannelHandler {

public static final int INITIAL_EVENTS_HELD = 3;

private final int maxEventsHeld;

private int sequence;
private int nextRequiredSequence;
private int nextRequiredSubsequence;

private final Queue<OrderedDownstreamChannelEvent> holdingQueue;

/**
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel
* connection. This is required as events cannot queue up indefintely; we would run out of
* memory if this was the case.
*/
public HttpPipeliningHandler(final int maxEventsHeld) {
this.maxEventsHeld = maxEventsHeld;

holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() {
@Override
public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) {
final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence();
if (delta == 0) {
return o1.getSubsequence() - o2.getSubsequence();
} else {
return delta;
}
}
});
}

public int getMaxEventsHeld() {
return maxEventsHeld;
}

@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
final Object msg = e.getMessage();
if (msg instanceof HttpRequest) {
ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress()));
} else {
ctx.sendUpstream(e);
}
}

@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof OrderedDownstreamChannelEvent) {

boolean channelShouldClose = false;

synchronized (holdingQueue) {
if (holdingQueue.size() < maxEventsHeld) {

final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e;
holdingQueue.add(currentEvent);

while (!holdingQueue.isEmpty()) {
final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek();

if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence |
nextEvent.getSubsequence() != nextRequiredSubsequence) {
break;
}
holdingQueue.remove();
ctx.sendDownstream(nextEvent.getChannelEvent());
if (nextEvent.isLast()) {
++nextRequiredSequence;
nextRequiredSubsequence = 0;
} else {
++nextRequiredSubsequence;
}
}

} else {
channelShouldClose = true;
}
}

if (channelShouldClose) {
Channels.close(e.getChannel());
}
} else {
super.handleDownstream(ctx, e);
}
}

}
Loading