Skip to content

Commit

Permalink
Internal: refactored TransportSingleCustomOperationAction, subclasses…
Browse files Browse the repository at this point in the history
… and requests

TransportSingleCustomOperationAction is subclassed by two similar, yet different transport action: TransportAnalyzeAction and TransportGetFieldMappingsAction. Made their difference and similarities more explicit by sharing common code and moving specific code to subclasses:
- moved index field to the parent SingleCustomOperationAction class
- moved the common check blocks code to the parent transport action class
- moved the main transport handler to the TransportAnalyzeAction subclass as it is only used to receive external requests through clients. In the case of the TransportGetFieldMappingsIndexAction instead, the action is internal and executed only locally as part of the user facing TransportGetFieldMappingsAction. The corresponding request gets sent over the transport though as part of the related shard request
- removed the get field mappings index action from the action names mapping as it is not a transport handler anymore. It was before although never used.

Closes #7214
  • Loading branch information
javanna committed Aug 11, 2014
1 parent ac40eae commit a038609
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 131 deletions.
Expand Up @@ -20,8 +20,6 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand All @@ -36,9 +34,7 @@
* A request to analyze a text associated with a specific index. Allow to provide
* the actual analyzer name to perform the analysis with.
*/
public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> implements IndicesRequest {

private String index;
public class AnalyzeRequest extends SingleCustomOperationRequest<AnalyzeRequest> {

private String text;

Expand Down Expand Up @@ -72,36 +68,14 @@ public AnalyzeRequest(String text) {
* @param text The text to analyze
*/
public AnalyzeRequest(@Nullable String index, String text) {
this.index = index;
this.index(index);
this.text = text;
}

public String text() {
return this.text;
}

public AnalyzeRequest index(String index) {
this.index = index;
return this;
}

public String index() {
return this.index;
}

@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}

public AnalyzeRequest analyzer(String analyzer) {
this.analyzer = analyzer;
return this;
Expand Down Expand Up @@ -165,7 +139,6 @@ public ActionRequestValidationException validate() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readOptionalString();
text = in.readString();
analyzer = in.readOptionalString();
tokenizer = in.readOptionalString();
Expand All @@ -179,7 +152,6 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(index);
out.writeString(text);
out.writeOptionalString(analyzer);
out.writeOptionalString(tokenizer);
Expand Down
Expand Up @@ -27,12 +27,12 @@
import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
Expand All @@ -44,13 +44,15 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
*
* Transport action used to execute analyze requests
*/
public class TransportAnalyzeAction extends TransportSingleCustomOperationAction<AnalyzeRequest, AnalyzeResponse> {

Expand All @@ -64,6 +66,7 @@ public TransportAnalyzeAction(Settings settings, ThreadPool threadPool, ClusterS
super(settings, AnalyzeAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.indicesAnalysisService = indicesAnalysisService;
transportService.registerHandler(AnalyzeAction.NAME, new TransportHandler());
}

@Override
Expand All @@ -81,16 +84,11 @@ protected AnalyzeResponse newResponse() {
return new AnalyzeResponse();
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, AnalyzeRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) {
request.index(state.metaData().concreteSingleIndex(request.index(), request.indicesOptions()));
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
return super.checkRequestBlock(state, request);
}
return null;
}
Expand Down Expand Up @@ -253,4 +251,44 @@ protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) th

return new AnalyzeResponse(tokens);
}

private class TransportHandler extends BaseTransportRequestHandler<AnalyzeRequest> {

@Override
public AnalyzeRequest newInstance() {
return newRequest();
}

@Override
public void messageReceived(AnalyzeRequest request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<AnalyzeResponse>() {
@Override
public void onResponse(AnalyzeResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}

@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}
Expand Up @@ -19,19 +19,14 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> implements IndicesRequest {

private String index;
class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetFieldMappingsIndexRequest> {

private boolean probablySingleFieldRequest;
private boolean includeDefaults;
Expand All @@ -48,21 +43,7 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
this.types = other.types();
this.fields = other.fields();
assert index != null;
this.index = index;
}

public String index() {
return index;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}

@Override
public String[] indices() {
return new String[]{index};
this.index(index);
}

public String[] types() {
Expand All @@ -81,34 +62,31 @@ public boolean includeDefaults() {
return includeDefaults;
}

/** Indicates whether default mapping settings should be returned */
public GetFieldMappingsIndexRequest includeDefaults(boolean includeDefaults) {
this.includeDefaults = includeDefaults;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeStringArray(types);
out.writeStringArray(fields);
out.writeBoolean(includeDefaults);
out.writeBoolean(probablySingleFieldRequest);
}

@Override
protected void writeIndex(StreamOutput out) throws IOException {
out.writeString(index());
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
types = in.readStringArray();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
probablySingleFieldRequest = in.readBoolean();
}

@Override
protected void readIndex(StreamInput in) throws IOException {
index(in.readString());
}
}
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.action.support.single.custom.TransportSingleCustomOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -53,6 +51,7 @@
import java.util.List;

/**
* Transport action used to retrieve the mappings related to fields that belong to a specific index
*/
public class TransportGetFieldMappingsIndexAction extends TransportSingleCustomOperationAction<GetFieldMappingsIndexRequest, GetFieldMappingsResponse> {

Expand Down Expand Up @@ -125,16 +124,6 @@ protected GetFieldMappingsResponse newResponse() {
return new GetFieldMappingsResponse();
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, GetFieldMappingsIndexRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, GetFieldMappingsIndexRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index());
}

private static final ToXContent.Params includeDefaultsParams = new ToXContent.Params() {

final static String INCLUDE_DEFAULTS = "include_defaults";
Expand Down
Expand Up @@ -20,6 +20,9 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -28,10 +31,11 @@
/**
*
*/
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> {
public abstract class SingleCustomOperationRequest<T extends SingleCustomOperationRequest> extends ActionRequest<T> implements IndicesRequest {

private boolean threadedOperation = true;
private boolean preferLocal = true;
private String index;

protected SingleCustomOperationRequest() {
}
Expand Down Expand Up @@ -67,6 +71,29 @@ public final T preferLocal(boolean preferLocal) {
return (T) this;
}

@SuppressWarnings("unchecked")
public T index(String index) {
this.index = index;
return (T)this;
}

public String index() {
return index;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}

@Override
public String[] indices() {
if (index == null) {
return Strings.EMPTY_ARRAY;
}
return new String[]{index};
}

/**
* if this operation hits a node with a local relevant shard, should it be preferred
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
Expand All @@ -83,12 +110,22 @@ public void beforeLocalFork() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
preferLocal = in.readBoolean();
readIndex(in);
}

protected void readIndex(StreamInput in) throws IOException {
index = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(preferLocal);
writeIndex(out);
}

protected void writeIndex(StreamOutput out) throws IOException {
out.writeOptionalString(index);
}
}

0 comments on commit a038609

Please sign in to comment.