Skip to content

Commit

Permalink
Ensure replica requests are marked as index_data (elastic#75008)
Browse files Browse the repository at this point in the history
This is related to elastic#73497. Currently replica requests are wrapped in a
concrete replica shard request. This leads to the transport layer not
properly identifying them as replica index_data requests and not
compressing them properly. This commit resolves this bug.
  • Loading branch information
Tim-Brooks committed Jul 6, 2021
1 parent 90d55bc commit 5b0d98c
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RawIndexingDataTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -1098,7 +1099,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
implements RawIndexingDataTransportRequest {

/** {@link AllocationId#getId()} of the shard this request is sent to **/
private final String targetAllocationID;
Expand Down Expand Up @@ -1189,6 +1191,14 @@ public long getPrimaryTerm() {
return primaryTerm;
}

@Override
public boolean isRawIndexingData() {
if (request instanceof RawIndexingDataTransportRequest) {
return ((RawIndexingDataTransportRequest) request).isRawIndexingData();
}
return false;
}

@Override
public String toString() {
return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@

/**
* Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS}
* is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be
* requests/responses primarily composed of raw source data.
* is configured to {@link Compression.Enabled#INDEXING_DATA} and isRawIndexingData() returns true. This is
* intended to be requests/responses primarily composed of raw source data.
*/
public interface RawIndexingDataTransportRequest {

default boolean isRawIndexingData() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,13 @@ public void sendRequest(long requestId, String action, TransportRequest request,
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
// We compress if total transport compression is enabled or if indexing_data transport compression
// is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be
// compressed.
boolean shouldCompress = compress == Compression.Enabled.TRUE ||
(compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest);
(compress == Compression.Enabled.INDEXING_DATA
&& request instanceof RawIndexingDataTransportRequest
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -87,8 +88,10 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -651,6 +654,74 @@ public void handleException(TransportException exp) {
}
}

public void testIndexingDataCompression() throws Exception {
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
String component = "cccccccccooooooooooooooommmmmmmmmmmppppppppppprrrrrrrreeeeeeeeeessssssssiiiiiiiiiibbbbbbbbllllllllleeeeee";
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 30; ++i) {
builder.append(component);
}
String text = builder.toString();
TransportRequestHandler<StringMessageRequest> handler = (request, channel, task) -> {
assertThat(text, equalTo(request.message));
try {
channel.sendResponse(new StringMessageResponse(""));
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
};
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);
serviceC.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);

Settings settingsWithCompress = Settings.builder()
.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA)
.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(),
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))
.build();
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
serviceA.connectToNode(serviceC.getLocalDiscoNode(), connectionProfile);

TransportResponseHandler<StringMessageResponse> responseHandler = new TransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse read(StreamInput in) throws IOException {
return new StringMessageResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public void handleResponse(StringMessageResponse response) {
}

@Override
public void handleException(TransportException exp) {
logger.error("Unexpected failure", exp);
fail("got exception instead of a response: " + exp.getMessage());
}
};

Future<StringMessageResponse> compressed = serviceC.submitRequest(serviceA.getLocalDiscoNode(), "internal:sayHello",
new StringMessageRequest(text, -1, true), responseHandler);
Future<StringMessageResponse> uncompressed = serviceA.submitRequest(serviceC.getLocalDiscoNode(), "internal:sayHello",
new StringMessageRequest(text, -1, false), responseHandler);

compressed.get();
uncompressed.get();
final long bytesLength;
try (BytesStreamOutput output = new BytesStreamOutput()) {
new StringMessageRequest(text, -1).writeTo(output);
bytesLength = output.bytes().length();
}
assertThat(serviceA.transport().getStats().getRxSize().getBytes(), lessThan(bytesLength));
assertThat(serviceC.transport().getStats().getRxSize().getBytes(), greaterThan(bytesLength));
}
}

public void testErrorMessage() {
serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
Expand Down Expand Up @@ -1188,14 +1259,20 @@ public void handleException(TransportException exp) {
}
}

public static class StringMessageRequest extends TransportRequest {
public static class StringMessageRequest extends TransportRequest implements RawIndexingDataTransportRequest {

private String message;
private long timeout;
private boolean isRawIndexingData = false;

StringMessageRequest(String message, long timeout) {
this(message, timeout, false);
}

StringMessageRequest(String message, long timeout, boolean isRawIndexingData) {
this.message = message;
this.timeout = timeout;
this.isRawIndexingData = isRawIndexingData;
}

public StringMessageRequest(StreamInput in) throws IOException {
Expand All @@ -1212,6 +1289,11 @@ public long timeout() {
return timeout;
}

@Override
public boolean isRawIndexingData() {
return isRawIndexingData;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down

0 comments on commit 5b0d98c

Please sign in to comment.