Skip to content

Commit

Permalink
Optimize dynamic mapping updates on master by processing latest one p…
Browse files Browse the repository at this point in the history
…er index/node

Instead of processing all the bulk of update mappings we have per index/node, we can only update the last ordered one out of those (cause they are incremented on the node/index level). This will improve the processing time of an index that have large updates of mappings.
closes elastic#4373
  • Loading branch information
kimchy committed Dec 7, 2013
1 parent 697b810 commit f68d1a0
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -607,11 +608,18 @@ private void updateMappingOnMaster(final String index, final String type) {
if (documentMapper == null) { // should not happen
return;
}
documentMapper.refreshSource();

IndexMetaData metaData = clusterService.state().metaData().index(index);
if (metaData == null) {
return;
}

// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();

final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource());
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null);
mappingUpdatedAction.execute(request, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -287,9 +288,13 @@ private void updateMappingOnMaster(final IndexRequest request, IndexMetaData ind
if (documentMapper == null) { // should not happen
return;
}
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest =
new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource());
new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {

private final AtomicLong mappingUpdateOrderGen = new AtomicLong();
private final MetaDataMappingService metaDataMappingService;

@Inject
Expand All @@ -57,6 +59,10 @@ public MappingUpdatedAction(Settings settings, TransportService transportService
this.metaDataMappingService = metaDataMappingService;
}

public long generateNextMappingUpdateOrder() {
return mappingUpdateOrderGen.incrementAndGet();
}

@Override
protected String transportAction() {
return "cluster/mappingUpdated";
Expand All @@ -80,7 +86,7 @@ protected MappingUpdatedResponse newResponse() {

@Override
protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticSearchException {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new ClusterStateUpdateListener() {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.order, request.nodeId, new ClusterStateUpdateListener() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new MappingUpdatedResponse());
Expand Down Expand Up @@ -112,15 +118,19 @@ public static class MappingUpdatedRequest extends MasterNodeOperationRequest<Map
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
private String type;
private CompressedString mappingSource;
private long order = -1; // -1 means not set...
private String nodeId = null; // null means not set

MappingUpdatedRequest() {
}

public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource) {
public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId) {
this.index = index;
this.indexUUID = indexUUID;
this.type = type;
this.mappingSource = mappingSource;
this.order = order;
this.nodeId = nodeId;
}

public String index() {
Expand All @@ -139,6 +149,20 @@ public CompressedString mappingSource() {
return mappingSource;
}

/**
* Returns -1 if not set...
*/
public long order() {
return this.order;
}

/**
* Returns null for not set.
*/
public String nodeId() {
return this.nodeId;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -153,6 +177,10 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
indexUUID = in.readString();
}
if (in.getVersion().after(Version.V_0_90_7)) {
order = in.readLong();
nodeId = in.readOptionalString();
}
}

@Override
Expand All @@ -164,6 +192,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
out.writeString(indexUUID);
}
if (out.getVersion().after(Version.V_0_90_7)) {
out.writeLong(order);
out.writeOptionalString(nodeId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
Expand All @@ -48,9 +47,9 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.*;
import java.util.concurrent.BlockingQueue;

import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.mergeFlags;
Expand All @@ -60,15 +59,20 @@
*/
public class MetaDataMappingService extends AbstractComponent {

private final ThreadPool threadPool;
private final ClusterService clusterService;

private final IndicesService indicesService;

private final BlockingQueue<MappingTask> refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue();
// the mutex protect all the refreshOrUpdate variables!
private final Object refreshOrUpdateMutex = new Object();
private final List<MappingTask> refreshOrUpdateQueue = new ArrayList<MappingTask>();
private long refreshOrUpdateInsertOrder;
private long refreshOrUpdateProcessedInsertOrder;

@Inject
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
public MetaDataMappingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
}
Expand All @@ -95,12 +99,16 @@ static class RefreshTask extends MappingTask {
static class UpdateTask extends MappingTask {
final String type;
final CompressedString mappingSource;
final long order; // -1 for unknown
final String nodeId; // null fr unknown
final ClusterStateUpdateListener listener;

UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, ClusterStateUpdateListener listener) {
UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, long order, String nodeId, ClusterStateUpdateListener listener) {
super(index, indexUUID);
this.type = type;
this.mappingSource = mappingSource;
this.order = order;
this.nodeId = nodeId;
this.listener = listener;
}
}
Expand All @@ -110,9 +118,26 @@ static class UpdateTask extends MappingTask {
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception {
List<MappingTask> allTasks = new ArrayList<MappingTask>();
refreshOrUpdateQueue.drainTo(allTasks);
ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
final List<MappingTask> allTasks = new ArrayList<MappingTask>();

synchronized (refreshOrUpdateMutex) {
if (refreshOrUpdateQueue.isEmpty()) {
return currentState;
}

// we already processed this task in a bulk manner in a previous cluster event, simply ignore
// it so we will let other tasks get in and processed ones, we will handle the queued ones
// later on in a subsequent cluster state event
if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
return currentState;
}

allTasks.addAll(refreshOrUpdateQueue);
refreshOrUpdateQueue.clear();

refreshOrUpdateProcessedInsertOrder = refreshOrUpdateInsertOrder;
}

if (allTasks.isEmpty()) {
return currentState;
Expand All @@ -131,32 +156,61 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
tasksPerIndex.put(task.index, indexTasks);
}
indexTasks.add(task);

}

boolean dirty = false;
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
String index = entry.getKey();
List<MappingTask> tasks = entry.getValue();
final IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
// index got deleted on us, ignore...
logger.debug("[{}] ignoring tasks - index meta data doesn't exist", index);
continue;
}
// the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
// the latest (based on order) update mapping one per node
List<MappingTask> allIndexTasks = entry.getValue();
List<MappingTask> tasks = new ArrayList<MappingTask>();
for (MappingTask task : allIndexTasks) {
if (!indexMetaData.isSameUUID(task.indexUUID)) {
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
continue;
}
boolean add = true;
// if its an update task, make sure we only process the latest ordered one per node
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
// we can only do something to compare if we have the order && node
if (uTask.order != -1 && uTask.nodeId != null) {
for (int i = 0; i < tasks.size(); i++) {
MappingTask existing = tasks.get(i);
if (existing instanceof UpdateTask) {
UpdateTask eTask = (UpdateTask) existing;
// if we have the order, and the node id, then we can compare, and replace if applicable
if (eTask.order != -1 && eTask.nodeId != null) {
if (eTask.nodeId.equals(uTask.nodeId) && uTask.order > eTask.order) {
// a newer update task, we can replace so we execute it one!
tasks.set(i, uTask);
add = false;
break;
}
}
}
}
}
}

if (add) {
tasks.add(task);
}
}

boolean removeIndex = false;
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = Sets.newHashSet();
try {
for (MappingTask task : tasks) {
final IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
// index got deleted on us, ignore...
logger.debug("[{}] ignoring task [{}] - index meta data doesn't exist", index, task);
continue;
}

if (!indexMetaData.isSameUUID(task.indexUUID)) {
// index got deleted on us, ignore...
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
continue;
}

if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task;
try {
Expand Down Expand Up @@ -248,13 +302,24 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
if (removeIndex) {
indicesService.removeIndex(index, "created for mapping processing");
}
for (Object task : tasks) {
}
}

// fork sending back updates, so we won't wait to send them back on the cluster state, there
// might be a few of those...
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
((UpdateTask) task).listener.onResponse(new ClusterStateUpdateResponse(true));
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
}
});


if (!dirty) {
return currentState;
Expand All @@ -266,7 +331,11 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exce
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String indexUUID, final String... types) {
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
final long insertOrder;
synchronized (refreshOrUpdateMutex) {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
Expand All @@ -275,13 +344,17 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState);
return executeRefreshOrUpdate(currentState, insertOrder);
}
});
}

public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final ClusterStateUpdateListener listener) {
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener));
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final long order, final String nodeId, final ClusterStateUpdateListener listener) {
final long insertOrder;
synchronized (refreshOrUpdateMutex) {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, order, nodeId, listener));
}
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
Expand All @@ -290,7 +363,7 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState);
return executeRefreshOrUpdate(currentState, insertOrder);
}
});
}
Expand Down

0 comments on commit f68d1a0

Please sign in to comment.