From 03c79971dd6250455cb78bc69988a9c885f665bf Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 6 Jul 2014 02:19:49 +0200 Subject: [PATCH] Call callback on actual mapping processed only callback the registered callback listeners when mapping have actually been processed... closes #6748 --- .../metadata/MetaDataMappingService.java | 84 ++++++++++++------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 3d5ae715859ee..d269f8bfd3a1f 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -25,12 +25,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -116,19 +114,19 @@ static class UpdateTask extends MappingTask { * as possible so we won't create the same index all the time for example for the updates on the same mapping * and generate a single cluster change event out of all of those. */ - ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception { + Tuple> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception { final List allTasks = new ArrayList<>(); synchronized (refreshOrUpdateMutex) { if (refreshOrUpdateQueue.isEmpty()) { - return currentState; + return Tuple.tuple(currentState, allTasks); } // we already processed this task in a bulk manner in a previous cluster event, simply ignore // it so we will let other tasks get in and processed ones, we will handle the queued ones // later on in a subsequent cluster state event if (insertionOrder < refreshOrUpdateProcessedInsertOrder) { - return currentState; + return Tuple.tuple(currentState, allTasks); } allTasks.addAll(refreshOrUpdateQueue); @@ -138,7 +136,7 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long } if (allTasks.isEmpty()) { - return currentState; + return Tuple.tuple(currentState, allTasks); } // break down to tasks per index, so we can optimize the on demand index service creation @@ -245,25 +243,10 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long } } - // fork sending back updates, so we won't wait to send them back on the cluster state, there - // might be a few of those... - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - for (Object task : allTasks) { - if (task instanceof UpdateTask) { - UpdateTask uTask = (UpdateTask) task; - ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true); - uTask.listener.onResponse(response); - } - } - } - }); - if (!dirty) { - return currentState; + return Tuple.tuple(currentState, allTasks); } - return ClusterState.builder(currentState).metaData(mdBuilder).build(); + return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks); } private boolean processIndexMappingTasks(List tasks, IndexService indexService, IndexMetaData.Builder builder) { @@ -349,7 +332,9 @@ public void refreshMapping(final String index, final String indexUUID, final Str insertOrder = ++refreshOrUpdateInsertOrder; refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types)); } - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { + private volatile List allTasks; + @Override public void onFailure(String source, Throwable t) { logger.warn("failure during [{}]", t, source); @@ -357,7 +342,23 @@ public void onFailure(String source, Throwable t) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return executeRefreshOrUpdate(currentState, insertOrder); + Tuple> tuple = executeRefreshOrUpdate(currentState, insertOrder); + this.allTasks = tuple.v2(); + return tuple.v1(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (allTasks == null) { + return; + } + for (Object task : allTasks) { + if (task instanceof UpdateTask) { + UpdateTask uTask = (UpdateTask) task; + ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true); + uTask.listener.onResponse(response); + } + } } }); } @@ -368,15 +369,36 @@ public void updateMapping(final String index, final String indexUUID, final Stri insertOrder = ++refreshOrUpdateInsertOrder; refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, order, nodeId, listener)); } - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ClusterStateUpdateTask() { - @Override + clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { + private volatile List allTasks; + public void onFailure(String source, Throwable t) { listener.onFailure(t); } @Override public ClusterState execute(final ClusterState currentState) throws Exception { - return executeRefreshOrUpdate(currentState, insertOrder); + Tuple> tuple = executeRefreshOrUpdate(currentState, insertOrder); + this.allTasks = tuple.v2(); + return tuple.v1(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (allTasks == null) { + return; + } + for (Object task : allTasks) { + if (task instanceof UpdateTask) { + UpdateTask uTask = (UpdateTask) task; + ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true); + try { + uTask.listener.onResponse(response); + } catch (Throwable t) { + logger.debug("failed ot ping back on response of mapping processing for task [{}]", t, uTask.listener); + } + } + } } }); } @@ -400,7 +422,7 @@ public ClusterState execute(ClusterState currentState) { for (String indexName : request.indices()) { IndexMetaData indexMetaData = currentState.metaData().index(indexName); IndexMetaData.Builder indexBuilder = IndexMetaData.builder(indexMetaData); - + if (indexMetaData != null) { boolean isLatestIndexWithout = true; for (String type : request.types()) {