Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call callback on actual mapping processed #6748

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,12 +25,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -116,19 +114,19 @@ static class UpdateTask extends MappingTask {
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
Tuple<ClusterState, List<MappingTask>> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
final List<MappingTask> allTasks = new ArrayList<>();

synchronized (refreshOrUpdateMutex) {
if (refreshOrUpdateQueue.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

// we already processed this task in a bulk manner in a previous cluster event, simply ignore
// it so we will let other tasks get in and processed ones, we will handle the queued ones
// later on in a subsequent cluster state event
if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

allTasks.addAll(refreshOrUpdateQueue);
Expand All @@ -138,7 +136,7 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
}

if (allTasks.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}

// break down to tasks per index, so we can optimize the on demand index service creation
Expand Down Expand Up @@ -245,25 +243,10 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
}
}

// fork sending back updates, so we won't wait to send them back on the cluster state, there
// might be a few of those...
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
});

if (!dirty) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks);
}

private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
Expand Down Expand Up @@ -349,15 +332,33 @@ public void refreshMapping(final String index, final String indexUUID, final Str
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;

@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should really do this in a try / catch block -- we should make sure that all listeners are all in even if one is bogus / throws and exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have a helper method since the logic below does that though...

}
}
}
});
}
Expand All @@ -368,15 +369,36 @@ public void updateMapping(final String index, final String indexUUID, final Stri
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, order, nodeId, listener));
}
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;

public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should spawn this to a background thread as this is still being run on the cluster state processing thread. Just be on the safe side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it, but I think this is the responsibility of the code that adds the listener, spawning (potentially a lot) of threads to callback is expensive, while most times a reply message will just be sent back

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm on the fence my self. It's just one (cached) thread we spawn per cluster task (which then executes all of the call backs). Not sure wether you saw this in the profiling as a potential time consumer when it was doing it before. I don't feel too strong about it though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, I didn't see it in profiling, but there isn't really a need for it since we just send back the response, I do think we need to properly handle failure if they happen, I will add the failure logic code and push, thanks!

if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
try {
uTask.listener.onResponse(response);
} catch (Throwable t) {
logger.debug("failed ot ping back on response of mapping processing for task [{}]", t, uTask.listener);
}
}
}
}
});
}
Expand All @@ -400,7 +422,7 @@ public ClusterState execute(ClusterState currentState) {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
IndexMetaData.Builder indexBuilder = IndexMetaData.builder(indexMetaData);

if (indexMetaData != null) {
boolean isLatestIndexWithout = true;
for (String type : request.types()) {
Expand Down