Skip to content

Commit

Permalink
Call callback on actual mapping processed
Browse files Browse the repository at this point in the history
only callback the registered callback listeners when mapping have actually been processed...
closes #6748
  • Loading branch information
kimchy committed Jul 7, 2014
1 parent 41b81f9 commit 03c7997
Showing 1 changed file with 53 additions and 31 deletions.
Expand Up @@ -25,12 +25,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -116,19 +114,19 @@ static class UpdateTask extends MappingTask {
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
Tuple<ClusterState, List<MappingTask>> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
final List<MappingTask> allTasks = new ArrayList<>();

synchronized (refreshOrUpdateMutex) {
if (refreshOrUpdateQueue.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

// we already processed this task in a bulk manner in a previous cluster event, simply ignore
// it so we will let other tasks get in and processed ones, we will handle the queued ones
// later on in a subsequent cluster state event
if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

allTasks.addAll(refreshOrUpdateQueue);
Expand All @@ -138,7 +136,7 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
}

if (allTasks.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

// break down to tasks per index, so we can optimize the on demand index service creation
Expand Down Expand Up @@ -245,25 +243,10 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
}
}

// fork sending back updates, so we won't wait to send them back on the cluster state, there
// might be a few of those...
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
});

if (!dirty) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks);
}

private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
Expand Down Expand Up @@ -349,15 +332,33 @@ public void refreshMapping(final String index, final String indexUUID, final Str
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;

@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
});
}
Expand All @@ -368,15 +369,36 @@ public void updateMapping(final String index, final String indexUUID, final Stri
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, order, nodeId, listener));
}
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;

public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
try {
uTask.listener.onResponse(response);
} catch (Throwable t) {
logger.debug("failed ot ping back on response of mapping processing for task [{}]", t, uTask.listener);
}
}
}
}
});
}
Expand All @@ -400,7 +422,7 @@ public ClusterState execute(ClusterState currentState) {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
IndexMetaData.Builder indexBuilder = IndexMetaData.builder(indexMetaData);

if (indexMetaData != null) {
boolean isLatestIndexWithout = true;
for (String type : request.types()) {
Expand Down

0 comments on commit 03c7997

Please sign in to comment.