Permalink
Browse files

Batch processing mapping updates can cause missed merged mappings whe…

…n batching multiple types

when we bulk changes, we need to use the same index metadata builder across the tasks, otherwise we might remove mappings erroneously
 also, when we check if we can use a higher order mapping, we need to verify that its for the same mapping type
  • Loading branch information...
kimchy committed Dec 11, 2013
1 parent a760f1f commit 10cdb0ae221a672bbc348140a30454d5aab29554
@@ -466,6 +466,10 @@ public Builder settings(Settings settings) {
return this;
}
public MappingMetaData mapping(String type) {
return mappings.get(type);
}
public Builder removeMapping(String mappingType) {
mappings.remove(mappingType);
return this;
@@ -161,9 +161,10 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
boolean dirty = false;
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
String index = entry.getKey();
final IndexMetaData indexMetaData = mdBuilder.get(index);
IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
// index got deleted on us, ignore...
logger.debug("[{}] ignoring tasks - index meta data doesn't exist", index);
@@ -188,13 +189,15 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
MappingTask existing = tasks.get(i);
if (existing instanceof UpdateTask) {
UpdateTask eTask = (UpdateTask) existing;
// if we have the order, and the node id, then we can compare, and replace if applicable
if (eTask.order != -1 && eTask.nodeId != null) {
if (eTask.nodeId.equals(uTask.nodeId) && uTask.order > eTask.order) {
// a newer update task, we can replace so we execute it one!
tasks.set(i, uTask);
add = false;
break;
if (eTask.type.equals(uTask.type)) {
// if we have the order, and the node id, then we can compare, and replace if applicable
if (eTask.order != -1 && eTask.nodeId != null) {
if (eTask.nodeId.equals(uTask.nodeId) && uTask.order > eTask.order) {
// a newer update task, we can replace so we execute it one!
tasks.set(i, uTask);
add = false;
break;
}
}
}
}
@@ -207,98 +210,37 @@ ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long
}
}
// construct the actual index if needed, and make sure the relevant mappings are there
boolean removeIndex = false;
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = Sets.newHashSet();
try {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
removeIndex = true;
Set<String> typesToIntroduce = Sets.newHashSet();
for (MappingTask task : tasks) {
if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task;
try {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
removeIndex = true;
for (String type : refreshTask.types) {
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
}
}
}
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
List<String> updatedTypes = Lists.newArrayList();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
updatedTypes.add(type);
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
}
if (updatedTypes.isEmpty()) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
mdBuilder.put(indexMetaDataBuilder);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
} else if (task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) task;
try {
String type = updateTask.type;
CompressedString mappingSource = updateTask.mappingSource;
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
continue;
}
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
removeIndex = true;
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
}
}
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
processedRefreshes.add(type);
// if we end up with the same mapping as the original once, ignore
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
continue;
}
// build the updated mapping source
if (logger.isDebugEnabled()) {
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
}
mdBuilder.put(IndexMetaData.builder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
}
} else {
logger.warn("illegal state, got wrong mapping task type [{}]", task);
if (task instanceof UpdateTask) {
typesToIntroduce.add(((UpdateTask) task).type);
} else if (task instanceof RefreshTask) {
Collections.addAll(typesToIntroduce, ((RefreshTask) task).types);
}
}
for (String type : typesToIntroduce) {
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
}
}
}
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
try {
boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder);
if (indexDirty) {
mdBuilder.put(builder);
dirty = true;
}
} finally {
if (removeIndex) {
indicesService.removeIndex(index, "created for mapping processing");
@@ -321,13 +263,86 @@ public void run() {
}
});
if (!dirty) {
return currentState;
}
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
boolean dirty = false;
String index = indexService.index().name();
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = Sets.newHashSet();
for (MappingTask task : tasks) {
if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task;
try {
List<String> updatedTypes = Lists.newArrayList();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (mapper == null) {
continue;
}
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
builder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
}
if (updatedTypes.isEmpty()) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
} else if (task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) task;
try {
String type = updateTask.type;
CompressedString mappingSource = updateTask.mappingSource;
MappingMetaData mappingMetaData = builder.mapping(type);
if (mappingMetaData != null && mappingMetaData.source().equals(mappingSource)) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
continue;
}
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
processedRefreshes.add(type);
// if we end up with the same mapping as the original once, ignore
if (mappingMetaData != null && mappingMetaData.source().equals(updatedMapper.mappingSource())) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
continue;
}
// build the updated mapping source
if (logger.isDebugEnabled()) {
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
}
builder.putMapping(new MappingMetaData(updatedMapper));
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
}
} else {
logger.warn("illegal state, got wrong mapping task type [{}]", task);
}
}
return dirty;
}
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
@@ -655,13 +655,13 @@ public synchronized MergeResult merge(DocumentMapper mergeWith, MergeFlags merge
return new MergeResult(mergeContext.buildConflicts());
}
public void refreshSource() throws FailedToGenerateSourceMapperException {
public CompressedString refreshSource() throws FailedToGenerateSourceMapperException {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
this.mappingSource = new CompressedString(builder.bytes());
return mappingSource = new CompressedString(builder.bytes());
} catch (Exception e) {
throw new FailedToGenerateSourceMapperException(e.getMessage(), e);
}
Oops, something went wrong.

0 comments on commit 10cdb0a

Please sign in to comment.