Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that all delete mapping internal requests share the same original headers and context #7736

Merged
merged 1 commit into from Sep 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -47,6 +48,14 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {

}

/**
* Copy constructor that creates a new flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public FlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}

/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
* be flushed.
Expand Down
Expand Up @@ -23,10 +23,13 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
Expand All @@ -35,7 +38,6 @@
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
Expand Down Expand Up @@ -112,7 +114,7 @@ protected ClusterBlockException checkBlock(DeleteMappingRequest request, Cluster
@Override
protected void masterOperation(final DeleteMappingRequest request, final ClusterState state, final ActionListener<DeleteMappingResponse> listener) throws ElasticsearchException {
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
flushAction.execute(Requests.flushRequest(concreteIndices), new ActionListener<FlushResponse>() {
flushAction.execute(new FlushRequest(request).indices(concreteIndices), new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -138,7 +140,9 @@ public void onResponse(FlushResponse flushResponse) {
request.types(types.toArray(new String[types.size()]));
QuerySourceBuilder querySourceBuilder = new QuerySourceBuilder()
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filterBuilder));
deleteByQueryAction.execute(Requests.deleteByQueryRequest(concreteIndices).types(request.types()).source(querySourceBuilder), new ActionListener<DeleteByQueryResponse>() {

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(request).indices(concreteIndices).types(request.types()).source(querySourceBuilder);
deleteByQueryAction.execute(deleteByQueryRequest, new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -151,7 +155,7 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
}
}
}
refreshAction.execute(Requests.refreshRequest(concreteIndices), new ActionListener<RefreshResponse>() {
refreshAction.execute(new RefreshRequest(request).indices(concreteIndices), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
if (logger.isTraceEnabled()) {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -41,6 +42,14 @@ public class RefreshRequest extends BroadcastOperationRequest<RefreshRequest> {
RefreshRequest() {
}

/**
* Copy constructor that creates a new refresh request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public RefreshRequest(ActionRequest originalRequest) {
super(originalRequest);
}

public RefreshRequest(String... indices) {
super(indices);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Charsets;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
Expand Down Expand Up @@ -72,6 +73,14 @@ public DeleteByQueryRequest(String... indices) {
public DeleteByQueryRequest() {
}

/**
* Copy constructor that creates a new delete by query request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteByQueryRequest(ActionRequest originalRequest) {
super(originalRequest);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
Expand Down Expand Up @@ -113,6 +122,7 @@ public DeleteByQueryRequest source(String query) {
/**
* The source to execute in the form of a map.
*/
@SuppressWarnings("unchecked")
public DeleteByQueryRequest source(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE);
Expand Down
Expand Up @@ -33,7 +33,7 @@
/**
*
*/
public class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
public abstract class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {

protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
protected String[] indices;
Expand All @@ -46,6 +46,13 @@ public TimeValue timeout() {
return timeout;
}

protected IndicesReplicationOperationRequest() {
}

protected IndicesReplicationOperationRequest(ActionRequest actionRequest) {
super(actionRequest);
}

/**
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down Expand Up @@ -74,6 +81,7 @@ public IndicesOptions indicesOptions() {
return indicesOptions;
}

@SuppressWarnings("unchecked")
public T indicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions == null) {
throw new IllegalArgumentException("IndicesOptions must not be null");
Expand Down