Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for mapping updates during local recovery #6666

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.action.index;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -48,9 +50,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -103,7 +103,11 @@ public void stop() {
}

public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID) {
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID));
updateMappingOnMaster(index, documentMapper, indexUUID, null);
}

public void updateMappingOnMaster(String index, DocumentMapper documentMapper, String indexUUID, MappingUpdateListener listener) {
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID, listener));
}

@Override
Expand Down Expand Up @@ -243,14 +247,26 @@ private static class MappingChange {
public final DocumentMapper documentMapper;
public final String index;
public final String indexUUID;
public final MappingUpdateListener listener;

MappingChange(DocumentMapper documentMapper, String index, String indexUUID) {
MappingChange(DocumentMapper documentMapper, String index, String indexUUID, MappingUpdateListener listener) {
this.documentMapper = documentMapper;
this.index = index;
this.indexUUID = indexUUID;
this.listener = listener;
}
}

/**
* A listener to be notified when the mappings were updated
*/
public static interface MappingUpdateListener {

void onMappingUpdate();

void onFailure(Throwable t);
}

/**
* The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
* indexing thread.
Expand Down Expand Up @@ -278,8 +294,62 @@ public void close() {
this.interrupt();
}

class UpdateKey {
public final String indexUUID;
public final String type;

UpdateKey(String indexUUID, String type) {
this.indexUUID = indexUUID;
this.type = type;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

UpdateKey updateKey = (UpdateKey) o;

if (!indexUUID.equals(updateKey.indexUUID)) return false;
if (!type.equals(updateKey.type)) return false;

return true;
}

@Override
public int hashCode() {
int result = indexUUID.hashCode();
result = 31 * result + type.hashCode();
return result;
}
}

class UpdateValue {
public final MappingChange mainChange;
public final List<MappingUpdateListener> listeners = Lists.newArrayList();

UpdateValue(MappingChange mainChange) {
this.mainChange = mainChange;
}

public void notifyListeners(@Nullable Throwable t) {
for (MappingUpdateListener listener : listeners) {
try {
if (t == null) {
listener.onMappingUpdate();
} else {
listener.onFailure(t);
}
} catch (Throwable lisFailure) {
logger.warn("unexpected failure on mapping update listener callback [{}]", lisFailure, listener);
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these classes be static?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can't at their current placement, because they are in non static inner class. they can be moved to outside, and be made static, but then, at least for the value class, we will need to pass the logger as well. I like it like this, feels more encapsulated, and the perf overhead of non static classes should be insignificant in the context of this operation (mapping updates)


@Override
public void run() {
Map<UpdateKey, UpdateValue> pendingUpdates = Maps.newHashMap();
while (running) {
try {
MappingChange polledChange = queue.poll(10, TimeUnit.MINUTES);
Expand All @@ -292,13 +362,23 @@ public void run() {
}
queue.drainTo(changes);
Collections.reverse(changes); // process then in newest one to oldest
Set<Tuple<String, String>> seenIndexAndTypes = Sets.newHashSet();
// go over and add to pending updates map
for (MappingChange change : changes) {
Tuple<String, String> checked = Tuple.tuple(change.indexUUID, change.documentMapper.type());
if (seenIndexAndTypes.contains(checked)) {
continue;
UpdateKey key = new UpdateKey(change.indexUUID, change.documentMapper.type());
UpdateValue updateValue = pendingUpdates.get(key);
if (updateValue == null) {
updateValue = new UpdateValue(change);
pendingUpdates.put(key, updateValue);
}
seenIndexAndTypes.add(checked);
if (change.listener != null) {
updateValue.listeners.add(change.listener);
}
}

for (Iterator<UpdateValue> iterator = pendingUpdates.values().iterator(); iterator.hasNext(); ) {
final UpdateValue updateValue = iterator.next();
iterator.remove();
MappingChange change = updateValue.mainChange;

final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try {
Expand All @@ -312,29 +392,37 @@ public void run() {
);
} catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.documentMapper.type() + "]", t);
updateValue.notifyListeners(t);
continue;
}
logger.trace("sending mapping updated to master: {}", mappingRequest);
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
logger.debug("successfully updated master with mapping update: {}", mappingRequest);
updateValue.notifyListeners(null);
}

@Override
public void onFailure(Throwable e) {
logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
updateValue.notifyListeners(e);
}
});

}
} catch (InterruptedException e) {
// are we shutting down? continue and check
if (running) {
logger.warn("failed to process mapping updates", e);
}
} catch (Throwable t) {
logger.warn("failed to process mapping updates", t);
if (t instanceof InterruptedException && !running) {
// all is well, we are shutting down
} else {
logger.warn("failed to process mapping updates", t);
}
// cleanup all pending update callbacks that were not processed due to a global failure...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to do it even if running is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean the cleanup? yea, so I thought it made sense since there might be other blocking registered blocking operations on it, so might as well release them as quickly as possible

for (Iterator<Map.Entry<UpdateKey, UpdateValue>> iterator = pendingUpdates.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<UpdateKey, UpdateValue> entry = iterator.next();
iterator.remove();
entry.getValue().notifyListeners(t);
}
}
}
}
Expand Down
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.index.gateway.local;

import com.google.common.collect.Sets;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -52,7 +54,10 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
*
Expand All @@ -64,6 +69,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
private final IndexService indexService;
private final InternalIndexShard indexShard;

private final TimeValue waitForMappingUpdatePostRecovery;

private final RecoveryState recoveryState = new RecoveryState();

private volatile ScheduledFuture flushScheduler;
Expand All @@ -78,6 +85,7 @@ public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSett
this.indexService = indexService;
this.indexShard = (InternalIndexShard) indexShard;

this.waitForMappingUpdatePostRecovery = componentSettings.getAsTime("wait_for_mapping_update_post_recovery", TimeValue.timeValueSeconds(30));
syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0) {
this.indexShard.translog().syncOnEachOperation(false);
Expand Down Expand Up @@ -215,6 +223,8 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
FileInputStream fs = null;

final Set<String> typesToUpdate = Sets.newHashSet();
try {
fs = new FileInputStream(recoveringTranslogFile);
InputStreamStreamInput si = new InputStreamStreamInput(fs);
Expand All @@ -232,8 +242,10 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
}
try {
Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation);
if (potentialIndexOperation != null) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), potentialIndexOperation.docMapper(), indexService.indexUUID());
if (potentialIndexOperation != null && potentialIndexOperation.parsedDoc().mappingsModified()) {
if (!typesToUpdate.contains(potentialIndexOperation.docMapper().type())) {
typesToUpdate.add(potentialIndexOperation.docMapper().type());
}
}
recoveryState.getTranslog().addTranslogOperations(1);
} catch (ElasticsearchException e) {
Expand All @@ -260,6 +272,31 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro

recoveringTranslogFile.delete();

for (final String type : typesToUpdate) {
final CountDownLatch latch = new CountDownLatch(1);
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.mapperService().documentMapper(type), indexService.indexUUID(), new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}

@Override
public void onFailure(Throwable t) {
latch.countDown();
logger.debug("failed to send mapping update post recovery to master for [{}]", t, type);
}
});

try {
boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS);
if (!waited) {
logger.debug("waited for mapping update on master for [{}], yet timed out");
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
}
}

recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
}

Expand Down