Skip to content

Commit

Permalink
Introduced a transient context to the rest request
Browse files Browse the repository at this point in the history
Similar to the one in `TransportMessage`. Added the `ContextHolder` base class where both `TransportMessage` and `RestRequest` derive from

Now next to the known headers, the context is always copied over from the rest request to the transport request (when the injected client is used)
  • Loading branch information
uboness committed Sep 5, 2014
1 parent 8b8cc80 commit 5df9c04
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 173 deletions.
129 changes: 129 additions & 0 deletions src/main/java/org/elasticsearch/common/ContextHolder.java
@@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;

/**
*
*/
public class ContextHolder {

private ObjectObjectOpenHashMap<Object, Object> context;

/**
* Attaches the given value to the context.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectOpenHashMap<>(2);
}
return (V) context.put(key, value);
}

/**
* Attaches the given values to the context
*/
public final synchronized void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
if (map == null) {
return;
}
if (context == null) {
context = new ObjectObjectOpenHashMap<>(map);
} else {
context.putAll(map);
}
}

/**
* @return The context value that is associated with the given key
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}

/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The value that is associated with the given key in the context
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}

/**
* Checks if the context contains an entry with the given key
*/
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}

/**
* @return The number of values attached in the context.
*/
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}

/**
* Checks if the context is empty.
*/
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}

/**
* @return A safe immutable copy of the current context.
*/
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}

/**
* Copies the context from the given context holder to this context holder. Any shared keys between
* the two context will be overridden by the given context holder.
*/
public synchronized void copyContextFrom(ContextHolder other) {
synchronized (other) {
if (other.context == null) {
return;
}
if (context == null) {
context = new ObjectObjectOpenHashMap<>(other.context);
} else {
context.putAll(other.context);
}
}
}
}
58 changes: 38 additions & 20 deletions src/main/java/org/elasticsearch/rest/BaseRestHandler.java
Expand Up @@ -32,7 +32,12 @@
import java.util.Set;

/**
* Base handler for REST requests
* Base handler for REST requests.
*
* This handler makes sure that the headers & context of the handled {@link RestRequest requests} are copied over to
* the transport requests executed by the associated client. While the context is fully copied over, not all the headers
* are copied, but a selected few. It is possible to control what header are copied over by registering them using
* {@link #addUsefulHeaders(String...)}
*/
public abstract class BaseRestHandler extends AbstractComponent implements RestHandler {

Expand Down Expand Up @@ -61,44 +66,45 @@ protected BaseRestHandler(Settings settings, Client client) {

@Override
public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {
handleRequest(request, channel, usefulHeaders.size() == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders));
handleRequest(request, channel, usefulHeaders.size() == 0 ? client : new HeadersAndContextCopyClient(client, request, usefulHeaders));

This comment has been minimized.

Copy link
@javanna

javanna Sep 9, 2014

Member

@uboness are we sure that we want to copy the request context only if there are usefulHeaders registered? I think this condition might be wrong now that we copy the context too?

This comment has been minimized.

Copy link
@uboness

uboness Sep 9, 2014

Author Contributor

aaaaiiiiiii... good catch!!!!! we always want to copy the context regardless of the headers

This comment has been minimized.

Copy link
@javanna

javanna Sep 9, 2014

Member

Ok, downside of this is that we will always need to wrap the client, while previously we would do it only if a plugin registered useful headers, empty by default. Do we want to do anything about this? Maybe allow to disable copying the context?

}

protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

static final class HeadersCopyClient extends FilterClient {
static final class HeadersAndContextCopyClient extends FilterClient {

private final RestRequest restRequest;
private final Set<String> usefulHeaders;
private final IndicesAdmin indicesAdmin;
private final ClusterAdmin clusterAdmin;
private final Set<String> headers;

HeadersCopyClient(Client in, RestRequest restRequest, Set<String> usefulHeaders) {
HeadersAndContextCopyClient(Client in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.usefulHeaders = usefulHeaders;
this.indicesAdmin = new IndicesAdmin(in.admin().indices());
this.clusterAdmin = new ClusterAdmin(in.admin().cluster());
this.indicesAdmin = new IndicesAdmin(in.admin().indices(), restRequest, headers);
this.clusterAdmin = new ClusterAdmin(in.admin().cluster(), restRequest, headers);
this.headers = headers;
}

private void copyHeaders(ActionRequest request) {
for (String usefulHeader : usefulHeaders) {
private static void copyHeadersAndContext(ActionRequest actionRequest, RestRequest restRequest, Set<String> headers) {
for (String usefulHeader : headers) {
String headerValue = restRequest.header(usefulHeader);
if (headerValue != null) {
request.putHeader(usefulHeader, headerValue);
actionRequest.putHeader(usefulHeader, headerValue);
}
}
actionRequest.copyContextFrom(restRequest);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}

Expand All @@ -112,38 +118,50 @@ public IndicesAdminClient indices() {
return indicesAdmin;
}

private final class ClusterAdmin extends FilterClient.ClusterAdmin {
private ClusterAdmin(ClusterAdminClient in) {
private static final class ClusterAdmin extends FilterClient.ClusterAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private ClusterAdmin(ClusterAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}

private final class IndicesAdmin extends FilterClient.IndicesAdmin {
private IndicesAdmin(IndicesAdminClient in) {

private final RestRequest restRequest;
private final Set<String> headers;

private IndicesAdmin(IndicesAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/rest/RestRequest.java
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.ContextHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,7 +39,7 @@
/**
*
*/
public abstract class RestRequest implements ToXContent.Params {
public abstract class RestRequest extends ContextHolder implements ToXContent.Params {

public enum Method {
GET, POST, PUT, DELETE, OPTIONS, HEAD
Expand Down
84 changes: 5 additions & 79 deletions src/main/java/org/elasticsearch/transport/TransportMessage.java
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.transport;

import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.ContextHolder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -33,12 +33,10 @@
import java.util.Set;

/**
*
* The transport message is also a {@link ContextHolder context holder} that holds <b>transient</b> context, that is,
* the context is not serialized with message.
*/
public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {

// a transient (not serialized with the request) key/value registry
private ObjectObjectOpenHashMap<Object, Object> context;
public abstract class TransportMessage<TM extends TransportMessage<TM>> extends ContextHolder implements Streamable {

private Map<String, Object> headers;

Expand All @@ -54,9 +52,7 @@ protected TransportMessage(TM message) {
if (((TransportMessage<?>) message).headers != null) {
this.headers = new HashMap<>(((TransportMessage<?>) message).headers);
}
if (((TransportMessage<?>) message).context != null) {
this.context = new ObjectObjectOpenHashMap<>(((TransportMessage<?>) message).context);
}
copyContextFrom(message);
}

public void remoteAddress(TransportAddress remoteAddress) {
Expand Down Expand Up @@ -89,76 +85,6 @@ public Set<String> getHeaders() {
return headers != null ? headers.keySet() : Collections.<String>emptySet();
}

/**
* Attaches the given transient value to the request - this value will not be serialized
* along with the request.
*
* There are many use cases such data is required, for example, when processing the
* request headers and building other constructs from them, one could "cache" the
* already built construct to avoid reprocessing the header over and over again.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectOpenHashMap<>(2);
}
return (V) context.put(key, value);
}

/**
* @return The transient value that is associated with the given key in the request context
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}

/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The transient value that is associated with the given key in the request context
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}

/**
* Checks if the request context contains an entry with the given key
*/
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}

/**
* @return The number of transient values attached in the request context.
*/
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}

/**
* Checks if the request context is empty.
*/
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}

/**
* @return A safe immutable copy of the current context of this request.
*/
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}

@Override
public void readFrom(StreamInput in) throws IOException {
headers = in.readBoolean() ? in.readMap() : null;
Expand Down

0 comments on commit 5df9c04

Please sign in to comment.