Skip to content

Commit

Permalink
Internal: refactor copy headers mechanism
Browse files Browse the repository at this point in the history
The functionality of copying headers in the REST layer (from REST requests to transport requests) remains the same. Made it a bit nicer by introducing a ClientFactory component that is a singleton and allows to register useful headers without requiring static methods.

Plugins just have to inject the ClientFactory now, and call its `addUsefulHeaders` method that is not static anymore.

Relates to elastic#6513
Closes elastic#7594
  • Loading branch information
javanna committed Sep 9, 2014
1 parent ee3cbce commit 8446948
Show file tree
Hide file tree
Showing 105 changed files with 445 additions and 494 deletions.
132 changes: 6 additions & 126 deletions src/main/java/org/elasticsearch/rest/BaseRestHandler.java
Expand Up @@ -19,151 +19,31 @@

package org.elasticsearch.rest;

import com.google.common.collect.Sets;
import org.elasticsearch.action.*;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;

import java.util.Collections;
import java.util.Set;

/**
* Base handler for REST requests.
*
* This handler makes sure that the headers & context of the handled {@link RestRequest requests} are copied over to
* the transport requests executed by the associated client. While the context is fully copied over, not all the headers
* are copied, but a selected few. It is possible to control what header are copied over by registering them using
* {@link #addUsefulHeaders(String...)}
* are copied, but a selected few. It is possible to control what headers are copied over by registering them using
* {@link org.elasticsearch.rest.ClientFactory#addUsefulHeaders(String...)}
*/
public abstract class BaseRestHandler extends AbstractComponent implements RestHandler {

private static Set<String> usefulHeaders = Sets.newCopyOnWriteArraySet();

/**
* Controls which REST headers get copied over from a {@link org.elasticsearch.rest.RestRequest} to
* its corresponding {@link org.elasticsearch.transport.TransportRequest}(s).
*
* By default no headers get copied but it is possible to extend this behaviour via plugins by calling this method.
*/
public static void addUsefulHeaders(String... headers) {
Collections.addAll(usefulHeaders, headers);
}
private final ClientFactory clientFactory;

static Set<String> usefulHeaders() {
return usefulHeaders;
}

private final Client client;

protected BaseRestHandler(Settings settings, Client client) {
protected BaseRestHandler(Settings settings, ClientFactory clientFactory) {
super(settings);
this.client = client;
this.clientFactory = clientFactory;
}

@Override
public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {
handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, usefulHeaders));
handleRequest(request, channel, clientFactory.client(request));
}

protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

static final class HeadersAndContextCopyClient extends FilterClient {

private final RestRequest restRequest;
private final IndicesAdmin indicesAdmin;
private final ClusterAdmin clusterAdmin;
private final Set<String> headers;

HeadersAndContextCopyClient(Client in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.indicesAdmin = new IndicesAdmin(in.admin().indices(), restRequest, headers);
this.clusterAdmin = new ClusterAdmin(in.admin().cluster(), restRequest, headers);
this.headers = headers;
}

private static void copyHeadersAndContext(ActionRequest actionRequest, RestRequest restRequest, Set<String> headers) {
for (String usefulHeader : headers) {
String headerValue = restRequest.header(usefulHeader);
if (headerValue != null) {
actionRequest.putHeader(usefulHeader, headerValue);
}
}
actionRequest.copyContextFrom(restRequest);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}

@Override
public ClusterAdminClient cluster() {
return clusterAdmin;
}

@Override
public IndicesAdminClient indices() {
return indicesAdmin;
}

private static final class ClusterAdmin extends FilterClient.ClusterAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private ClusterAdmin(ClusterAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}

private final class IndicesAdmin extends FilterClient.IndicesAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private IndicesAdmin(IndicesAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}
}
}
164 changes: 164 additions & 0 deletions src/main/java/org/elasticsearch/rest/ClientFactory.java
@@ -0,0 +1,164 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.rest;

import com.google.common.collect.Sets;
import org.elasticsearch.action.*;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.inject.Inject;

import java.util.Collections;
import java.util.Set;

/**
* Client factory that returns a proper {@link Client} given a {@link org.elasticsearch.rest.RestRequest}.
* Makes it possible to register useful headers that will be copied over from REST requests
* to corresponding transport requests at execution time.
*/
public final class ClientFactory {

private Set<String> usefulHeaders = Sets.newCopyOnWriteArraySet();
private final Client client;

@Inject
public ClientFactory(Client client) {
this.client = client;
}

/**
* Returns a proper {@link Client client} given the provided {@link org.elasticsearch.rest.RestRequest}
*/
public Client client(RestRequest restRequest) {
return usefulHeaders.size() == 0 ? client : new HeadersAndContextCopyClient(client, restRequest, usefulHeaders);
}

/**
* Controls which REST headers get copied over from a {@link org.elasticsearch.rest.RestRequest} to
* its corresponding {@link org.elasticsearch.transport.TransportRequest}(s).
*
* By default no headers get copied but it is possible to extend this behaviour via plugins by calling this method.
*/
public void addUsefulHeaders(String... headers) {
Collections.addAll(usefulHeaders, headers);
}

Set<String> usefulHeaders() {
return usefulHeaders;
}

static final class HeadersAndContextCopyClient extends FilterClient {

private final RestRequest restRequest;
private final IndicesAdmin indicesAdmin;
private final ClusterAdmin clusterAdmin;
private final Set<String> headers;

HeadersAndContextCopyClient(Client in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.indicesAdmin = new IndicesAdmin(in.admin().indices(), restRequest, headers);
this.clusterAdmin = new ClusterAdmin(in.admin().cluster(), restRequest, headers);
this.headers = headers;
}

private static void copyHeadersAndContext(ActionRequest actionRequest, RestRequest restRequest, Set<String> headers) {
for (String usefulHeader : headers) {
String headerValue = restRequest.header(usefulHeader);
if (headerValue != null) {
actionRequest.putHeader(usefulHeader, headerValue);
}
}
actionRequest.copyContextFrom(restRequest);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}

@Override
public ClusterAdminClient cluster() {
return clusterAdmin;
}

@Override
public IndicesAdminClient indices() {
return indicesAdmin;
}

private static final class ClusterAdmin extends FilterClient.ClusterAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private ClusterAdmin(ClusterAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}

private final class IndicesAdmin extends FilterClient.IndicesAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private IndicesAdmin(IndicesAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}
}
}
Expand Up @@ -26,10 +26,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestToXContentListener;

import java.util.Locale;
Expand All @@ -42,8 +39,8 @@
public class RestClusterHealthAction extends BaseRestHandler {

@Inject
public RestClusterHealthAction(Settings settings, Client client, RestController controller) {
super(settings, client);
public RestClusterHealthAction(Settings settings, RestController controller, ClientFactory clientFactory) {
super(settings, clientFactory);

controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this);
Expand Down
Expand Up @@ -36,8 +36,8 @@
public class RestNodesHotThreadsAction extends BaseRestHandler {

@Inject
public RestNodesHotThreadsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
public RestNodesHotThreadsAction(Settings settings, RestController controller, ClientFactory clientFactory) {
super(settings, clientFactory);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hotthreads", this);
Expand Down
Expand Up @@ -45,9 +45,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
private final static Set<String> ALLOWED_METRICS = Sets.newHashSet("http", "jvm", "network", "os", "plugins", "process", "settings", "thread_pool", "transport");

@Inject
public RestNodesInfoAction(Settings settings, Client client, RestController controller,
SettingsFilter settingsFilter) {
super(settings, client);
public RestNodesInfoAction(Settings settings, RestController controller,
SettingsFilter settingsFilter, ClientFactory clientFactory) {
super(settings, clientFactory);
controller.registerHandler(GET, "/_nodes", this);
// this endpoint is used for metrics, not for nodeIds, like /_nodes/fs
controller.registerHandler(GET, "/_nodes/{nodeId}", this);
Expand Down
Expand Up @@ -35,8 +35,8 @@
public class RestNodesRestartAction extends BaseRestHandler {

@Inject
public RestNodesRestartAction(Settings settings, Client client, RestController controller) {
super(settings, client);
public RestNodesRestartAction(Settings settings, RestController controller, ClientFactory clientFactory) {
super(settings, clientFactory);

controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/_restart", this);
controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/{nodeId}/_restart", this);
Expand Down
Expand Up @@ -36,8 +36,8 @@
public class RestNodesShutdownAction extends BaseRestHandler {

@Inject
public RestNodesShutdownAction(Settings settings, Client client, RestController controller) {
super(settings, client);
public RestNodesShutdownAction(Settings settings, RestController controller, ClientFactory clientFactory) {
super(settings, clientFactory);

controller.registerHandler(RestRequest.Method.POST, "/_shutdown", this);
controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/_shutdown", this);
Expand Down

0 comments on commit 8446948

Please sign in to comment.