Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced a transient context to the rest request #7610

Merged
merged 1 commit into from Sep 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 129 additions & 0 deletions src/main/java/org/elasticsearch/common/ContextHolder.java
@@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;

/**
*
*/
public class ContextHolder {

private ObjectObjectOpenHashMap<Object, Object> context;

/**
* Attaches the given value to the context.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectOpenHashMap<>(2);
}
return (V) context.put(key, value);
}

/**
* Attaches the given values to the context
*/
public final synchronized void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
if (map == null) {
return;
}
if (context == null) {
context = new ObjectObjectOpenHashMap<>(map);
} else {
context.putAll(map);
}
}

/**
* @return The context value that is associated with the given key
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}

/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The value that is associated with the given key in the context
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}

/**
* Checks if the context contains an entry with the given key
*/
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}

/**
* @return The number of values attached in the context.
*/
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}

/**
* Checks if the context is empty.
*/
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}

/**
* @return A safe immutable copy of the current context.
*/
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}

/**
* Copies the context from the given context holder to this context holder. Any shared keys between
* the two context will be overridden by the given context holder.
*/
public synchronized void copyContextFrom(ContextHolder other) {
synchronized (other) {
if (other.context == null) {
return;
}
if (context == null) {
context = new ObjectObjectOpenHashMap<>(other.context);
} else {
context.putAll(other.context);
}
}
}
}
58 changes: 38 additions & 20 deletions src/main/java/org/elasticsearch/rest/BaseRestHandler.java
Expand Up @@ -32,7 +32,12 @@
import java.util.Set;

/**
* Base handler for REST requests
* Base handler for REST requests.
*
* This handler makes sure that the headers & context of the handled {@link RestRequest requests} are copied over to
* the transport requests executed by the associated client. While the context is fully copied over, not all the headers
* are copied, but a selected few. It is possible to control what header are copied over by registering them using
* {@link #addUsefulHeaders(String...)}
*/
public abstract class BaseRestHandler extends AbstractComponent implements RestHandler {

Expand Down Expand Up @@ -61,44 +66,45 @@ protected BaseRestHandler(Settings settings, Client client) {

@Override
public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {
handleRequest(request, channel, usefulHeaders.size() == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders));
handleRequest(request, channel, usefulHeaders.size() == 0 ? client : new HeadersAndContextCopyClient(client, request, usefulHeaders));
}

protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

static final class HeadersCopyClient extends FilterClient {
static final class HeadersAndContextCopyClient extends FilterClient {

private final RestRequest restRequest;
private final Set<String> usefulHeaders;
private final IndicesAdmin indicesAdmin;
private final ClusterAdmin clusterAdmin;
private final Set<String> headers;

HeadersCopyClient(Client in, RestRequest restRequest, Set<String> usefulHeaders) {
HeadersAndContextCopyClient(Client in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.usefulHeaders = usefulHeaders;
this.indicesAdmin = new IndicesAdmin(in.admin().indices());
this.clusterAdmin = new ClusterAdmin(in.admin().cluster());
this.indicesAdmin = new IndicesAdmin(in.admin().indices(), restRequest, headers);
this.clusterAdmin = new ClusterAdmin(in.admin().cluster(), restRequest, headers);
this.headers = headers;
}

private void copyHeaders(ActionRequest request) {
for (String usefulHeader : usefulHeaders) {
private static void copyHeadersAndContext(ActionRequest actionRequest, RestRequest restRequest, Set<String> headers) {
for (String usefulHeader : headers) {
String headerValue = restRequest.header(usefulHeader);
if (headerValue != null) {
request.putHeader(usefulHeader, headerValue);
actionRequest.putHeader(usefulHeader, headerValue);
}
}
actionRequest.copyContextFrom(restRequest);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}

Expand All @@ -112,38 +118,50 @@ public IndicesAdminClient indices() {
return indicesAdmin;
}

private final class ClusterAdmin extends FilterClient.ClusterAdmin {
private ClusterAdmin(ClusterAdminClient in) {
private static final class ClusterAdmin extends FilterClient.ClusterAdmin {

private final RestRequest restRequest;
private final Set<String> headers;

private ClusterAdmin(ClusterAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}

private final class IndicesAdmin extends FilterClient.IndicesAdmin {
private IndicesAdmin(IndicesAdminClient in) {

private final RestRequest restRequest;
private final Set<String> headers;

private IndicesAdmin(IndicesAdminClient in, RestRequest restRequest, Set<String> headers) {
super(in);
this.restRequest = restRequest;
this.headers = headers;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
return super.execute(action, request);
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
copyHeaders(request);
copyHeadersAndContext(request, restRequest, headers);
super.execute(action, request, listener);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/rest/RestRequest.java
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.ContextHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,7 +39,7 @@
/**
*
*/
public abstract class RestRequest implements ToXContent.Params {
public abstract class RestRequest extends ContextHolder implements ToXContent.Params {

public enum Method {
GET, POST, PUT, DELETE, OPTIONS, HEAD
Expand Down
84 changes: 5 additions & 79 deletions src/main/java/org/elasticsearch/transport/TransportMessage.java
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.transport;

import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.ContextHolder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -33,12 +33,10 @@
import java.util.Set;

/**
*
* The transport message is also a {@link ContextHolder context holder} that holds <b>transient</b> context, that is,
* the context is not serialized with message.
*/
public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {

// a transient (not serialized with the request) key/value registry
private ObjectObjectOpenHashMap<Object, Object> context;
public abstract class TransportMessage<TM extends TransportMessage<TM>> extends ContextHolder implements Streamable {

private Map<String, Object> headers;

Expand All @@ -54,9 +52,7 @@ protected TransportMessage(TM message) {
if (((TransportMessage<?>) message).headers != null) {
this.headers = new HashMap<>(((TransportMessage<?>) message).headers);
}
if (((TransportMessage<?>) message).context != null) {
this.context = new ObjectObjectOpenHashMap<>(((TransportMessage<?>) message).context);
}
copyContextFrom(message);
}

public void remoteAddress(TransportAddress remoteAddress) {
Expand Down Expand Up @@ -89,76 +85,6 @@ public Set<String> getHeaders() {
return headers != null ? headers.keySet() : Collections.<String>emptySet();
}

/**
* Attaches the given transient value to the request - this value will not be serialized
* along with the request.
*
* There are many use cases such data is required, for example, when processing the
* request headers and building other constructs from them, one could "cache" the
* already built construct to avoid reprocessing the header over and over again.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectOpenHashMap<>(2);
}
return (V) context.put(key, value);
}

/**
* @return The transient value that is associated with the given key in the request context
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}

/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The transient value that is associated with the given key in the request context
*
* @see #putInContext(Object, Object)
*/
@SuppressWarnings("unchecked")
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}

/**
* Checks if the request context contains an entry with the given key
*/
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}

/**
* @return The number of transient values attached in the request context.
*/
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}

/**
* Checks if the request context is empty.
*/
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}

/**
* @return A safe immutable copy of the current context of this request.
*/
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}

@Override
public void readFrom(StreamInput in) throws IOException {
headers = in.readBoolean() ? in.readMap() : null;
Expand Down