Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended ActionFilter to also enable filtering the response side #7465

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 52 additions & 5 deletions src/main/java/org/elasticsearch/action/support/ActionFilter.java
Expand Up @@ -21,20 +21,67 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;

/**
* A filter allowing to filter transport actions
*/
public interface ActionFilter {

/**
* Filters the actual execution of the request by either sending a response through the {@link ActionListener}
* or continuing the filters execution through the {@link ActionFilterChain}
* The position of the filter in the chain. Execution is done from lowest order to highest.
*/
void process(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain);
int order();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call it ord or position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name right now is aligned with other "ordered" constructs in es (e.g. index templates, rest filters, etc...)


/**
* The position of the filter in the chain. Execution is done from lowest order to highest.
* Enables filtering the execution of an action on the request side, either by sending a response through the
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
*/
int order();
void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain);

/**
* Enables filtering the execution of an action on the response side, either by sending a response through the
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
*/
void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain);

/**
* A simple base class for injectable action filters that spares the implementation from handling the
* filter chain. This base class should serve any action filter implementations that doesn't require
* to apply async filtering logic.
*/
public static abstract class Simple extends AbstractComponent implements ActionFilter {

protected Simple(Settings settings) {
super(settings);
}

@Override
public final void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
if (apply(action, request, listener)) {
chain.proceed(action, request, listener);
}
}

/**
* Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false}
* if it should be aborted since the filter already handled the request and called the given listener.
*/
protected abstract boolean apply(String action, ActionRequest request, ActionListener listener);

@Override
public final void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) {
if (apply(action, response, listener)) {
chain.proceed(action, response, listener);
}
}

/**
* Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false}
* if it should be aborted since the filter already handled the response by calling the given listener.
*/
protected abstract boolean apply(String action, ActionResponse response, ActionListener listener);
}
}
Expand Up @@ -21,14 +21,22 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;

/**
* A filter chain allowing to continue and process the transport action request
*/
public interface ActionFilterChain {

/**
* Continue processing the request. Should only be called if a response has not been sent through the {@link ActionListener}
* Continue processing the request. Should only be called if a response has not been sent through
* the given {@link ActionListener listener}
*/
void continueProcessing(final String action, final ActionRequest request, final ActionListener actionListener);
void proceed(final String action, final ActionRequest request, final ActionListener listener);

/**
* Continue processing the response. Should only be called if a response has not been sent through
* the given {@link ActionListener listener}
*/
void proceed(final String action, final ActionResponse response, final ActionListener listener);
}
95 changes: 81 additions & 14 deletions src/main/java/org/elasticsearch/action/support/TransportAction.java
Expand Up @@ -78,8 +78,8 @@ public final void execute(Request request, ActionListener<Response> listener) {
listener.onFailure(t);
}
} else {
ActionFilterChain actionFilterChain = new TransportActionFilterChain();
actionFilterChain.continueProcessing(actionName, request, listener);
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(actionName, request, listener);
}
}

Expand Down Expand Up @@ -146,28 +146,95 @@ public void run() {
}
}

private class TransportActionFilterChain implements ActionFilterChain {
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain {

private final TransportAction<Request, Response> action;
private final AtomicInteger index = new AtomicInteger();
private final ESLogger logger;

@SuppressWarnings("unchecked")
@Override
public void continueProcessing(String action, ActionRequest actionRequest, ActionListener actionListener) {
private RequestFilterChain(TransportAction<Request, Response> action, ESLogger logger) {
this.action = action;
this.logger = logger;
}

@Override @SuppressWarnings("unchecked")
public void proceed(String actionName, ActionRequest request, ActionListener listener) {
int i = index.getAndIncrement();
try {
if (i < filters.length) {
filters[i].process(action, actionRequest, actionListener, this);
} else if (i == filters.length) {
ActionListener<Response> listener = (ActionListener<Response>) actionListener;
Request request = (Request) actionRequest;
doExecute(request, listener);
if (i < this.action.filters.length) {
this.action.filters[i].apply(actionName, request, listener, this);
} else if (i == this.action.filters.length) {
this.action.doExecute((Request) request, new FilteredActionListener<Response>(actionName, listener, new ResponseFilterChain(this.action.filters, logger)));
} else {
actionListener.onFailure(new IllegalStateException("continueProcessing was called too many times"));
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
actionListener.onFailure(t);
listener.onFailure(t);
}
}

@Override
public void proceed(String action, ActionResponse response, ActionListener listener) {
assert false : "request filter chain should never be called on the response side";
}
}

private static class ResponseFilterChain implements ActionFilterChain {

private final ActionFilter[] filters;
private final AtomicInteger index;
private final ESLogger logger;

private ResponseFilterChain(ActionFilter[] filters, ESLogger logger) {
this.filters = filters;
this.index = new AtomicInteger(filters.length);
this.logger = logger;
}

@Override
public void proceed(String action, ActionRequest request, ActionListener listener) {
assert false : "response filter chain should never be called on the request side";
}

@Override @SuppressWarnings("unchecked")
public void proceed(String action, ActionResponse response, ActionListener listener) {
int i = index.decrementAndGet();
try {
if (i >= 0) {
filters[i].apply(action, response, listener, this);
} else if (i == -1) {
listener.onResponse(response);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch (Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
}
}
}

private static class FilteredActionListener<Response extends ActionResponse> implements ActionListener<Response> {

private final String actionName;
private final ActionListener listener;
private final ResponseFilterChain chain;

private FilteredActionListener(String actionName, ActionListener listener, ResponseFilterChain chain) {
this.actionName = actionName;
this.listener = listener;
this.chain = chain;
}

@Override
public void onResponse(Response response) {
chain.proceed(actionName, response, listener);
}

@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
}