Skip to content

Commit

Permalink
Always compress based on the settings (#36522)
Browse files Browse the repository at this point in the history
Currently TransportRequestOptions allows specific requests to request
compression. This commit removes this and always compresses based on the
settings. Additionally, it removes TransportResponseOptions as they
are unused.

This closes #36399.
  • Loading branch information
Tim-Brooks committed Dec 12, 2018
1 parent 02d0f16 commit 7f612d5
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request,
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(false);
DiscoveryNode node = clusterService.state().nodes().get(request.getTaskId().getNodeId());
if (node == null) {
// Node is no longer part of the cluster! Try and look the task up from the results index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ public TransportNodesSnapshotsStatus(ThreadPool threadPool, ClusterService clust
this.snapshotShardsService = snapshotShardsService;
}

@Override
protected boolean transportCompress() {
return true; // compress since the metadata can become large
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public BulkResponse newResponse() {

@Override
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.BULK)
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
).build();
return TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ protected void doExecute(Task task, NodesRequest request, ActionListener<NodesRe
new AsyncAction(task, request, listener).start();
}

protected boolean transportCompress() {
return false;
}

/**
* Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s.
*
Expand Down Expand Up @@ -173,7 +169,6 @@ void start() {
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ protected TasksResponse newResponse(TasksRequest request, AtomicReferenceArray r
*/
protected abstract void taskOperation(TasksRequest request, OperationTask task, ActionListener<TaskResponse> listener);

protected boolean transportCompress() {
return false;
}

private class AsyncAction {

private final TasksRequest request;
Expand Down Expand Up @@ -255,7 +251,6 @@ private void start() {
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodesIds.length; i++) {
final String nodeId = nodesIds[i];
final int idx = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public class PublicationTransportHandler {
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).build();

public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
Expand Down Expand Up @@ -213,7 +217,6 @@ public String toString() {
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
Expand All @@ -223,7 +226,7 @@ public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyC
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, options,
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
new TransportResponseHandler<TransportResponse.Empty>() {

@Override
Expand Down Expand Up @@ -254,11 +257,6 @@ private void sendClusterStateToNode(ClusterState clusterState, BytesReference by
ActionListener<PublishWithJoinResponse> responseActionListener, boolean sendDiffs,
Map<Version, BytesReference> serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
final TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
final Consumer<TransportException> transportExceptionHandler = exp -> {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
Expand Down Expand Up @@ -304,7 +302,7 @@ public String executor() {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler;
}
transportService.sendRequest(node, actionName, request, options, transportResponseHandler);
transportService.sendRequest(node, actionName, request, stateRequestOptions, transportResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public class PublishClusterStateAction {
public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";

// -> no need to put a timeout on the options, because we want the state response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).build();

public interface IncomingClusterStateListener {

/**
Expand Down Expand Up @@ -284,14 +289,9 @@ private void sendClusterStateToNode(final ClusterState clusterState, BytesRefere
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {

// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
options,
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

@Override
Expand Down Expand Up @@ -324,12 +324,9 @@ private void sendCommitToNode(final DiscoveryNode node, final ClusterState clust
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",
clusterState.stateUUID(), clusterState.version(), node);
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, COMMIT_ACTION_NAME,
new CommitClusterStateRequest(clusterState.stateUUID()),
options,
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable Tim
return future;
}

@Override
protected boolean transportCompress() {
return true; // compress since the metadata can become large
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ public void list(ShardId shardId, DiscoveryNode[] nodes,
execute(new Request(shardId, nodes), listener);
}

@Override
protected boolean transportCompress() {
return true; // this can become big...
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
this.recoverySettings = recoverySettings;
this.onSourceThrottle = onSourceThrottle;
this.translogOpsRequestOptions = TransportRequestOptions.builder()
.withCompress(true)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so
// we are saving the cpu for other things
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
Expand Down
39 changes: 16 additions & 23 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final boolean compressResponses;
private final boolean compressAllResponses;
private volatile BoundTransportAddress boundAddress;
private final String transportName;

Expand All @@ -220,16 +220,16 @@ public TcpTransport(String transportName, Settings settings, Version version, T
this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.compressAllResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
this.handshaker = new TransportHandshaker(version, threadPool,
(node, channel, requestId, v) -> sendRequestToChannel(node, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
TransportRequestOptions.EMPTY, v, TransportStatus.setHandshake((byte) 0)),
TransportRequestOptions.EMPTY, v, false, TransportStatus.setHandshake((byte) 0)),
(v, features, channel, response, requestId) -> sendResponse(v, features, channel, response, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0)));
TransportHandshaker.HANDSHAKE_ACTION_NAME, false, TransportStatus.setHandshake((byte) 0)));
this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);

Expand Down Expand Up @@ -337,11 +337,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());

if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), compress, (byte) 0);
}
}

Expand Down Expand Up @@ -768,11 +764,11 @@ private boolean canCompress(TransportRequest request) {

private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
byte status) throws IOException, TransportException {
boolean compressRequest, byte status) throws IOException, TransportException {

// only compress if asked and the request is not bytes. Otherwise only
// the header part is compressed, and the "body" can't be extracted as compressed
final boolean compressMessage = options.compress() && canCompress(request);
final boolean compressMessage = compressRequest && canCompress(request);

status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
Expand Down Expand Up @@ -871,8 +867,8 @@ public void sendResponse(
final TransportResponse response,
final long requestId,
final String action,
final TransportResponseOptions options) throws IOException {
sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0);
final boolean compress) throws IOException {
sendResponse(nodeVersion, features, channel, response, requestId, action, compress, (byte) 0);
}

private void sendResponse(
Expand All @@ -882,29 +878,26 @@ private void sendResponse(
final TransportResponse response,
final long requestId,
final String action,
TransportResponseOptions options,
boolean compress,
byte status) throws IOException {
if (compressResponses && options.compress() == false) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
boolean compressMessage = compress || compressAllResponses;

status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress());
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
boolean addedReleaseListener = false;
try {
if (options.compress()) {
if (compressMessage) {
status = TransportStatus.setCompress(status);
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
stream.setFeatures(features);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);

final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
ReleaseListener releaseListener = new ReleaseListener(stream,
() -> messageListener.onResponseSent(requestId, action, response, finalOptions));
() -> messageListener.onResponseSent(requestId, action, response));
internalSendMessage(channel, message, releaseListener);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1530,9 +1523,9 @@ public void onRequestReceived(long requestId, String action) {
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
listener.onResponseSent(requestId, action, response);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,7 @@ public String getProfileName() {
@Override
public void sendResponse(TransportResponse response) throws IOException {
try {
TransportResponseOptions options;
if (compressResponse) {
options = TransportResponseOptions.builder().withCompress(true).build();
} else {
options = TransportResponseOptions.EMPTY;
}
transport.sendResponse(version, features, channel, response, requestId, action, options);
transport.sendResponse(version, features, channel, response, requestId, action, compressResponse);
} finally {
release(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ default void onRequestReceived(long requestId, String action) {}
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
default void onResponseSent(long requestId, String action, TransportResponse response) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
Expand Down

0 comments on commit 7f612d5

Please sign in to comment.