diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 93fae6a0113ed..b5be48193449e 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; -import java.io.IOException; import java.util.Map; /** @@ -116,7 +115,7 @@ private void publish(ClusterState clusterState, final ClusterStatePublishRespons // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, - new PublishClusterStateRequest(bytes, node.version()), + new BytesTransportRequest(bytes, node.version()), options, // no need to compress, we already compressed the bytes new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @@ -152,52 +151,25 @@ public void handleException(TransportException exp) { } } - class PublishClusterStateRequest extends TransportRequest { - - BytesReference clusterStateInBytes; - Version version; - - PublishClusterStateRequest() { - } - - PublishClusterStateRequest(BytesReference clusterStateInBytes, Version version) { - this.clusterStateInBytes = clusterStateInBytes; - this.version = version; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - clusterStateInBytes = in.readBytesReference(); - version = in.getVersion(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBytesReference(clusterStateInBytes); - } - } - - private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler { + private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler { static final String ACTION = "discovery/zen/publish"; @Override - public PublishClusterStateRequest newInstance() { - return new PublishClusterStateRequest(); + public BytesTransportRequest newInstance() { + return new BytesTransportRequest(); } @Override - public void messageReceived(PublishClusterStateRequest request, final TransportChannel channel) throws Exception { - Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes); + public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception { + Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in; if (compressor != null) { - in = CachedStreamInput.cachedHandlesCompressed(compressor, request.clusterStateInBytes.streamInput()); + in = CachedStreamInput.cachedHandlesCompressed(compressor, request.bytes().streamInput()); } else { - in = CachedStreamInput.cachedHandles(request.clusterStateInBytes.streamInput()); + in = CachedStreamInput.cachedHandles(request.bytes().streamInput()); } - in.setVersion(request.version); + in.setVersion(request.version()); ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); logger.debug("received cluster state version {}", clusterState.version()); listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() { diff --git a/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java b/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java new file mode 100644 index 0000000000000..eca17b9014cd1 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A specialized, bytes only request, that can potentially be optimized on the network + * layer, specifically for teh same large buffer send to several nodes. + */ +public class BytesTransportRequest extends TransportRequest { + + BytesReference bytes; + Version version; + + public BytesTransportRequest() { + + } + + public BytesTransportRequest(BytesReference bytes, Version version) { + this.bytes = bytes; + this.version = version; + } + + public Version version() { + return this.version; + } + + public BytesReference bytes() { + return this.bytes; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + bytes = in.readBytesReference(); + version = in.getVersion(); + } + + /** + * Writes the data in a "thin" manner, without the actual bytes, assumes + * the actual bytes will be appended right after this content. + */ + public void writeThin(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(bytes.length()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(bytes); + } +} diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 275bff900d18a..1a50c689e1084 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -53,6 +53,7 @@ import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.CompositeChannelBuffer; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; @@ -554,10 +555,25 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St stream.setVersion(version); stream.writeString(action); - request.writeTo(stream); - stream.close(); - ChannelBuffer buffer = bStream.bytes().toChannelBuffer(); + ChannelBuffer buffer; + // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output + // that create paged channel buffers, but its tricky to know when to do it (where this option is + // more explicit). + if (request instanceof BytesTransportRequest) { + BytesTransportRequest bRequest = (BytesTransportRequest) request; + assert node.version().equals(bRequest.version()); + bRequest.writeThin(stream); + stream.close(); + ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer(); + ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer(); + // false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer.... + buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.of(headerBuffer, contentBuffer), false); + } else { + request.writeTo(stream); + stream.close(); + buffer = bStream.bytes().toChannelBuffer(); + } NettyHeader.writeHeader(buffer, requestId, status, version); targetChannel.write(buffer);