Skip to content

Commit

Permalink
Introduce BytesTransportRequest, allowing for downstream network opti…
Browse files Browse the repository at this point in the history
…mization in buffers usage

When sending a request, mainly to multiple nodes, if we already have the "body" of the request in bytes, we can share it instead of copying it over to a new buffer. Also, it helps a lot when sending a relatively large body to multiple nodes, since it will use the same body buffer across all nodes
  • Loading branch information
kimchy committed Dec 10, 2013
1 parent c4f3da2 commit a9e259d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

import java.io.IOException;
import java.util.Map;

/**
Expand Down Expand Up @@ -116,7 +115,7 @@ private void publish(ClusterState clusterState, final ClusterStatePublishRespons
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(bytes, node.version()),
new BytesTransportRequest(bytes, node.version()),
options, // no need to compress, we already compressed the bytes

new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
Expand Down Expand Up @@ -152,52 +151,25 @@ public void handleException(TransportException exp) {
}
}

class PublishClusterStateRequest extends TransportRequest {

BytesReference clusterStateInBytes;
Version version;

PublishClusterStateRequest() {
}

PublishClusterStateRequest(BytesReference clusterStateInBytes, Version version) {
this.clusterStateInBytes = clusterStateInBytes;
this.version = version;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterStateInBytes = in.readBytesReference();
version = in.getVersion();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(clusterStateInBytes);
}
}

private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> {
private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<BytesTransportRequest> {

static final String ACTION = "discovery/zen/publish";

@Override
public PublishClusterStateRequest newInstance() {
return new PublishClusterStateRequest();
public BytesTransportRequest newInstance() {
return new BytesTransportRequest();
}

@Override
public void messageReceived(PublishClusterStateRequest request, final TransportChannel channel) throws Exception {
Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes);
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in;
if (compressor != null) {
in = CachedStreamInput.cachedHandlesCompressed(compressor, request.clusterStateInBytes.streamInput());
in = CachedStreamInput.cachedHandlesCompressed(compressor, request.bytes().streamInput());
} else {
in = CachedStreamInput.cachedHandles(request.clusterStateInBytes.streamInput());
in = CachedStreamInput.cachedHandles(request.bytes().streamInput());
}
in.setVersion(request.version);
in.setVersion(request.version());
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
logger.debug("received cluster state version {}", clusterState.version());
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A specialized, bytes only request, that can potentially be optimized on the network
* layer, specifically for teh same large buffer send to several nodes.
*/
public class BytesTransportRequest extends TransportRequest {

BytesReference bytes;
Version version;

public BytesTransportRequest() {

}

public BytesTransportRequest(BytesReference bytes, Version version) {
this.bytes = bytes;
this.version = version;
}

public Version version() {
return this.version;
}

public BytesReference bytes() {
return this.bytes;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytes = in.readBytesReference();
version = in.getVersion();
}

/**
* Writes the data in a "thin" manner, without the actual bytes, assumes
* the actual bytes will be appended right after this content.
*/
public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(bytes.length());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
Expand Down Expand Up @@ -554,10 +555,25 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St

stream.setVersion(version);
stream.writeString(action);
request.writeTo(stream);
stream.close();

ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
// false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer....
buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.<ChannelBuffer>of(headerBuffer, contentBuffer), false);
} else {
request.writeTo(stream);
stream.close();
buffer = bStream.bytes().toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);
targetChannel.write(buffer);

Expand Down

0 comments on commit a9e259d

Please sign in to comment.