Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revamp TransportRequest handlers to support Writeable #26315

Merged
merged 1 commit into from
Aug 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public ActionRequest() {
// this.listenerThreaded = request.listenerThreaded();
}

public ActionRequest(StreamInput in) throws IOException {
super(in);
}

public abstract ActionRequestValidationException validate();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -68,6 +69,18 @@ public SimulatePipelineRequest(BytesReference source, XContentType xContentType)
SimulatePipelineRequest() {
}

SimulatePipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readOptionalString();
verbose = in.readBoolean();
source = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(source);
}
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down Expand Up @@ -99,15 +112,7 @@ public XContentType getXContentType() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readOptionalString();
verbose = in.readBoolean();
source = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(source);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu

@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, SimulatePipelineRequest::new, indexNameExpressionResolver);
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -43,6 +44,12 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
}

protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
IndexNameExpressionResolver indexNameExpressionResolver) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestReader);
}

protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
Expand All @@ -51,6 +58,14 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c
new TransportHandler());
}

protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
new TransportHandler());
}

class TransportHandler implements TransportRequestHandler<Request> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.io.stream;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
Expand All @@ -43,4 +44,12 @@ public interface Streamable {
* Write this object's fields to a {@linkplain StreamOutput}.
*/
void writeTo(StreamOutput out) throws IOException;

static <T extends Streamable> Writeable.Reader<T> newWriteableReader(Supplier<T> supplier) {
return (StreamInput in) -> {
T request = supplier.get();
request.readFrom(in);
return request;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.transport;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;

Expand All @@ -32,15 +34,14 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
private final boolean forceExecution;
private final boolean canTripCircuitBreaker;
private final String executor;
private final Supplier<Request> requestFactory;
private final TaskManager taskManager;
private final Writeable.Reader<Request> requestReader;

public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
public RequestHandlerRegistry(String action, Writeable.Reader<Request> requestReader, TaskManager taskManager,
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
boolean canTripCircuitBreaker) {
this.action = action;
this.requestFactory = requestFactory;
assert newRequest() != null;
this.requestReader = requestReader;
this.handler = handler;
this.forceExecution = forceExecution;
this.canTripCircuitBreaker = canTripCircuitBreaker;
Expand All @@ -52,8 +53,8 @@ public String getAction() {
return action;
}

public Request newRequest() {
return requestFactory.get();
public Request newRequest(StreamInput in) throws IOException {
return requestReader.read(in);
}

public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,9 +1475,8 @@ protected String handleRequest(Channel channel, String profileName, final Stream
}
transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest();
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(remoteAddress));
request.readFrom(stream);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(stream, requestId, action);
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -97,11 +98,11 @@ public String executor() {

static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
T wrapped;
Supplier<T> supplier;
Writeable.Reader<T> reader;
DiscoveryNode targetNode;

ProxyRequest(Supplier<T> supplier) {
this.supplier = supplier;
ProxyRequest(Writeable.Reader<T> reader) {
this.reader = reader;
}

ProxyRequest(T wrapped, DiscoveryNode targetNode) {
Expand All @@ -113,8 +114,7 @@ static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
targetNode = new DiscoveryNode(in);
wrapped = supplier.get();
wrapped.readFrom(in);
wrapped = reader.read(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.TransportAddress;

import java.io.IOException;

public abstract class TransportMessage implements Streamable {
public abstract class TransportMessage implements Streamable, Writeable {

private TransportAddress remoteAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static class Empty extends TransportRequest {
public TransportRequest() {
}

public TransportRequest(StreamInput in) throws IOException {
parentTaskId = TaskId.readFromStream(in);
}

/**
* Set a reference to task that created this request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
Expand Down Expand Up @@ -709,7 +711,24 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}

/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, String executor,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestReader, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}

Expand All @@ -729,7 +748,28 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}

/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader The request class that will be used to construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action,
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public void testSerialization() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest();
otherRequest.readFrom(streamInput);
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(streamInput);

assertThat(otherRequest.getId(), equalTo(request.getId()));
assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose()));
Expand All @@ -65,8 +64,7 @@ public void testSerializationWithXContent() throws IOException {
request.writeTo(output);
StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes);

SimulatePipelineRequest serialized = new SimulatePipelineRequest();
serialized.readFrom(in);
SimulatePipelineRequest serialized = new SimulatePipelineRequest(in);
assertEquals(XContentType.JSON, serialized.getXContentType());
assertEquals("{}", serialized.getSource().utf8ToString());
}
Expand All @@ -77,8 +75,7 @@ public void testSerializationWithXContentBwc() throws IOException {
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
SimulatePipelineRequest request = new SimulatePipelineRequest();
request.readFrom(in);
SimulatePipelineRequest request = new SimulatePipelineRequest(in);
assertEquals(XContentType.JSON, request.getXContentType());
assertEquals("{}", request.getSource().utf8ToString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void testIsProxyAction() {
}

public void testIsProxyRequest() {
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null)));
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null)));
assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ protected void sendRequest(Connection connection, long requestId, String action,
RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action);
BytesStreamOutput bStream = new BytesStreamOutput();
request.writeTo(bStream);
final TransportRequest clonedRequest = reg.newRequest();
clonedRequest.readFrom(bStream.bytes().streamInput());
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());

Runnable runnable = new AbstractRunnable() {
AtomicBoolean requestSent = new AtomicBoolean();
Expand Down