Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mappings: Update mapping on master in async manner #6648

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -137,7 +139,8 @@ protected ShardIterator shards(ClusterState clusterState, BulkShardRequest reque
@Override
protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
Engine.IndexingOperation[] ops = null;
final Set<Tuple<String, String>> mappingsToUpdate = Sets.newHashSet();

Expand Down Expand Up @@ -180,7 +183,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
if (docMapper != null) {
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. thx

}
}
throw (ElasticsearchException) e;
}
Expand Down Expand Up @@ -340,7 +346,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
}

for (Tuple<String, String> mappingToUpdate : mappingsToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), mappingToUpdate.v2(), true);
DocumentMapper docMapper = indexService.mapperService().documentMapper(mappingToUpdate.v2());
if (docMapper != null) {
mappingUpdatedAction.updateMappingOnMaster(mappingToUpdate.v1(), docMapper, indexService.indexUUID());
}
}

if (request.refresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -184,7 +185,8 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
}
}

IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
Expand All @@ -193,7 +195,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());
}
indexShard.index(index);
version = index.version();
Expand All @@ -203,7 +205,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());
}
indexShard.create(create);
version = create.version();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.cluster.action.index;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -31,19 +33,25 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -53,75 +61,49 @@
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {

public static final String INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME = "indices.mapping.additional_mapping_change_time";

private final AtomicLong mappingUpdateOrderGen = new AtomicLong();
private final MetaDataMappingService metaDataMappingService;
private final IndicesService indicesService;

private final boolean waitForMappingChange;
private volatile MasterMappingUpdater masterMappingUpdater;

private volatile TimeValue additionalMappingChangeTime;

class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final TimeValue current = MappingUpdatedAction.this.additionalMappingChangeTime;
final TimeValue newValue = settings.getAsTime(INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, current);
if (!current.equals(newValue)) {
logger.info("updating " + INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME + " from [{}] to [{}]", current, newValue);
MappingUpdatedAction.this.additionalMappingChangeTime = newValue;
}
}
}

@Inject
public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataMappingService metaDataMappingService, IndicesService indicesService) {
MetaDataMappingService metaDataMappingService, NodeSettingsService nodeSettingsService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataMappingService = metaDataMappingService;
this.indicesService = indicesService;
this.waitForMappingChange = settings.getAsBoolean("action.wait_on_mapping_change", false);
// this setting should probably always be 0, just add the option to wait for more changes within a time window
this.additionalMappingChangeTime = settings.getAsTime(INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, TimeValue.timeValueMillis(0));
nodeSettingsService.addListener(new ApplySettings());
}

public void updateMappingOnMaster(String index, String type, boolean neverWaitForMappingChange) {
IndexMetaData metaData = clusterService.state().metaData().index(index);
if (metaData != null) {
updateMappingOnMaster(index, type, metaData.getUUID(), neverWaitForMappingChange);
}
public void start() {
this.masterMappingUpdater = new MasterMappingUpdater(EsExecutors.threadName(settings, "master_mapping_updater"));
this.masterMappingUpdater.start();
}

public void updateMappingOnMaster(String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
final MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(type);
if (documentMapper != null) { // should not happen
updateMappingOnMaster(documentMapper, index, type, indexUUID, neverWaitForMappingChange);
}
public void stop() {
this.masterMappingUpdater.close();
this.masterMappingUpdater = null;
}

public void updateMappingOnMaster(DocumentMapper documentMapper, String index, String type, String indexUUID, boolean neverWaitForMappingChange) {
final CountDownLatch latch = new CountDownLatch(1);
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdateOrderGen.incrementAndGet();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
index, indexUUID, type, documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
} catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + index + "], type [" + type + "]", t);
latch.countDown();
throw t;
}
logger.trace("Sending mapping updated to master: {}", mappingRequest);
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
latch.countDown();
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}

@Override
public void onFailure(Throwable e) {
latch.countDown();
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
if (waitForMappingChange && !neverWaitForMappingChange) {
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID) {
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID));
}

@Override
Expand Down Expand Up @@ -256,4 +238,105 @@ public String toString() {
return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]";
}
}

private static class MappingChange {
public final DocumentMapper documentMapper;
public final String index;
public final String indexUUID;

MappingChange(DocumentMapper documentMapper, String index, String indexUUID) {
this.documentMapper = documentMapper;
this.index = index;
this.indexUUID = indexUUID;
}
}

/**
* The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
* indexing thread.
* <p/>
* It also allows to reduce multiple mapping updates on the same index(UUID) and type into one update
* (refreshSource + sending to master), which allows to offload the number of times mappings are updated
* and sent to master for heavy single index requests that each introduce a new mapping, and when
* multiple shards exists on the same nodes, allowing to work on the index level in this case.
*/
private class MasterMappingUpdater extends Thread {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement Runnable instead of extending from Thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, but then we need to have the runnable around, with the thread as variables. I like this encapsulation, I don't think extending Thread is such a bad idea for this case. Will switch if there is strong sentiment about it


private volatile boolean running = true;
private final BlockingQueue<MappingChange> queue = ConcurrentCollections.newBlockingQueue();

public MasterMappingUpdater(String name) {
super(name);
}

public void add(MappingChange change) {
queue.add(change);
}

public void close() {
running = false;
this.interrupt();
}

@Override
public void run() {
while (running) {
try {
MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES);
if (polledChange == null) {
continue;
}
List<MappingChange> changes = Lists.newArrayList(polledChange);
if (additionalMappingChangeTime.millis() > 0) {
Thread.sleep(additionalMappingChangeTime.millis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should catch and ignore interrupt messages here to reduce the chance we loose a mapping update?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the outer catch interrupt should handle it I think..., if someone interrupts this thread, its only when its shutting down, I can add a check and log if its still in running state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that if we sleep here, we know for sure that there is a change event that should have gone to the master. I we just let it flow through the code, it will first send it and then check running and stop.

}
queue.drainTo(changes);
Collections.reverse(changes); // process then in newest one to oldest
Set<Tuple<String, String>> seenIndexAndTypes = Sets.newHashSet();
for (MappingChange change : changes) {
Tuple<String, String> checked = Tuple.tuple(change.indexUUID, change.documentMapper.type());
if (seenIndexAndTypes.contains(checked)) {
continue;
}
seenIndexAndTypes.add(checked);

final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdateOrderGen.incrementAndGet();
change.documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
change.index, change.indexUUID, change.documentMapper.type(), change.documentMapper.mappingSource(), orderId, node != null ? node.id() : null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern regarding batching and the fact that we use the first change object. Since we don't directly access the MappingService we might fail to send a newer version of the doc. I like the change made to updateMappingOnMaster to only keep one variant. Maybe we should keep just one which refers to the type as string and always resolve the current doc mapper here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't that concerned about it, but to be on the safe side, I reversed the order of the events

);
} catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.documentMapper.type() + "]", t);
continue;
}
logger.trace("sending mapping updated to master: {}", mappingRequest);
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
logger.debug("successfully updated master with mapping update: {}", mappingRequest);
}

@Override
public void onFailure(Throwable e) {
logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
}
});

}
} catch (InterruptedException e) {
// are we shutting down? continue and check
if (running) {
logger.warn("failed to process mapping updates", e);
}
} catch (Throwable t) {
logger.warn("failed to process mapping updates", t);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
Expand Down Expand Up @@ -62,6 +63,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE);
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER);
Expand Down
Loading