Skip to content

Commit

Permalink
Added support for node acknowledgements in delete mapping api
Browse files Browse the repository at this point in the history
  • Loading branch information
javanna committed Oct 28, 2013
1 parent 109029c commit be5118a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.mapping.delete;

import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;

/**
* Cluster state update request that allows to delete a mapping
*/
public class DeleteMappingClusterStateUpdateRequest extends ClusterStateUpdateRequest<DeleteMappingClusterStateUpdateRequest> {

private String[] indices;
private String type;

DeleteMappingClusterStateUpdateRequest() {

}

/**
* Returns the indices the operation needs to be executed on
*/
public String[] indices() {
return indices;
}

/**
* Sets the indices the operation needs to be executed on
*/
public DeleteMappingClusterStateUpdateRequest indices(String[] indices) {
this.indices = indices;
return this;
}

/**
* Returns the type to be removed
*/
public String type() {
return type;
}

/**
* Sets the type to be removed
*/
public DeleteMappingClusterStateUpdateRequest type(String type) {
this.type = type;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package org.elasticsearch.action.admin.indices.mapping.delete;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -29,9 +30,9 @@
import static org.elasticsearch.action.ValidateActions.addValidationError;

/**
*
* Represents a request to delete a mapping
*/
public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappingRequest> {
public class DeleteMappingRequest extends AcknowledgedRequest<DeleteMappingRequest> {

private String[] indices;

Expand All @@ -41,7 +42,7 @@ public class DeleteMappingRequest extends MasterNodeOperationRequest<DeleteMappi
}

/**
* Constructs a new put mapping request against one or more indices. If nothing is set then
* Constructs a new delete mapping request against one or more indices. If nothing is set then
* it will be executed against all indices.
*/
public DeleteMappingRequest(String... indices) {
Expand All @@ -58,15 +59,15 @@ public ActionRequestValidationException validate() {
}

/**
* Sets the indices this put mapping operation will execute on.
* Sets the indices this delete mapping operation will execute on.
*/
public DeleteMappingRequest indices(String[] indices) {
this.indices = indices;
return this;
}

/**
* The indices the mappings will be put.
* The indices the mappings will be removed from.
*/
public String[] indices() {
return indices;
Expand Down Expand Up @@ -97,6 +98,7 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
type = in.readString();
}
readTimeout(in, Version.V_0_90_6);
}

@Override
Expand All @@ -116,5 +118,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
out.writeString(type);
}
writeTimeout(out, Version.V_0_90_6);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,29 @@
package org.elasticsearch.action.admin.indices.mapping.delete;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;

/**
*
* Builder for a delete mapping request
*/
public class DeleteMappingRequestBuilder extends MasterNodeOperationRequestBuilder<DeleteMappingRequest, DeleteMappingResponse, DeleteMappingRequestBuilder> {
public class DeleteMappingRequestBuilder extends AcknowledgedRequestBuilder<DeleteMappingRequest, DeleteMappingResponse, DeleteMappingRequestBuilder> {

public DeleteMappingRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new DeleteMappingRequest());
}

/**
* Sets the indices the delete mapping will execute on
*/
public DeleteMappingRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}

/**
* The type of the mapping to remove.
* Sets the type of the mapping to remove
*/
public DeleteMappingRequestBuilder setType(String type) {
request.type(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package org.elasticsearch.action.admin.indices.mapping.delete;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -28,19 +29,25 @@
/**
* The response of remove mapping operation.
*/
public class DeleteMappingResponse extends ActionResponse {
public class DeleteMappingResponse extends AcknowledgedResponse {

DeleteMappingResponse() {

}

DeleteMappingResponse(boolean acknowledged) {
super(acknowledged);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in, Version.V_0_90_6);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out, Version.V_0_90_6);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
Expand Down Expand Up @@ -106,25 +108,24 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
listener.onResponse(new DeleteMappingResponse());
}

@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
removeMapping();
}

@Override
public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
removeMapping();
}

protected void removeMapping() {
DeleteMappingClusterStateUpdateRequest clusterStateUpdateRequest = new DeleteMappingClusterStateUpdateRequest()
.indices(request.indices()).type(request.type())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());

metaDataMappingService.removeMapping(clusterStateUpdateRequest, new ClusterStateUpdateListener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
listener.onResponse(new DeleteMappingResponse());
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new DeleteMappingResponse(response.isAcknowledged()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
Expand Down Expand Up @@ -293,12 +295,32 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
});
}

public void removeMapping(final RemoveRequest request, final Listener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
public void removeMapping(final DeleteMappingClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask() {

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}

@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(true));
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}

@Override
public TimeValue timeout() {
return request.masterTimeout;
return request.masterNodeTimeout();
}

@Override
Expand All @@ -308,18 +330,18 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(ClusterState currentState) {
if (request.indices.length == 0) {
if (request.indices().length == 0) {
throw new IndexMissingException(new Index("_all"));
}

MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
boolean changed = false;
String latestIndexWithout = null;
for (String indexName : request.indices) {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData != null) {
if (indexMetaData.mappings().containsKey(request.mappingType)) {
builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.mappingType));
if (indexMetaData.mappings().containsKey(request.type())) {
builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.type()));
changed = true;
} else {
latestIndexWithout = indexMetaData.index();
Expand All @@ -328,17 +350,17 @@ public ClusterState execute(ClusterState currentState) {
}

if (!changed) {
throw new TypeMissingException(new Index(latestIndexWithout), request.mappingType);
throw new TypeMissingException(new Index(latestIndexWithout), request.type());
}

logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType);
logger.info("[{}] remove_mapping [{}]", request.indices(), request.type());

return ClusterState.builder().state(currentState).metaData(builder).build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new Response(true));

}
});
}
Expand Down Expand Up @@ -526,23 +548,6 @@ public static interface Listener {
void onFailure(Throwable t);
}

public static class RemoveRequest {

final String[] indices;
final String mappingType;
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;

public RemoveRequest(String[] indices, String mappingType) {
this.indices = indices;
this.mappingType = mappingType;
}

public RemoveRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}

public static class PutRequest {

final String[] indices;
Expand Down

0 comments on commit be5118a

Please sign in to comment.