Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update API: Detect noop updates when using doc #6862

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/reference/docs/update.asciidoc
Expand Up @@ -109,6 +109,23 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
If both `doc` and `script` is specified, then `doc` is ignored. Best is
to put your field pairs of the partial document in the script itself.

By default if `doc` is specified then the document is always updated even
if the merging process doesn't cause any changes. Specifying `detect_noop`
as `true` will cause Elasticsearch to check if there are changes and, if
there aren't, turn the update request into a noop. For example:
[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc" : {
"name" : "new_name"
},
"detect_noop": true
}'
--------------------------------------------------

If `name` was `new_name` before the request was sent then the entire update
request is ignored.

There is also support for `upsert`. If the document does
not already exists, the content of the `upsert` element will be used to
index the fresh doc:
Expand Down
Expand Up @@ -81,8 +81,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation

@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper) {
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = updateHelper;
Expand Down Expand Up @@ -552,6 +552,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
}
case NONE:
UpdateResponse updateResponse = translate.action();
indexShard.indexingService().noopUpdate(updateRequest.type());
return new UpdateResult(translate, updateResponse);
default:
throw new ElasticsearchIllegalStateException("Illegal update operation " + translate.operation());
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -68,16 +69,18 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;

@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper) {
UpdateHelper updateHelper, IndicesService indicesService) {
super(settings, threadPool, clusterService, transportService);
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.indicesService = indicesService;
this.autoCreateIndex = new AutoCreateIndex(settings);
}

Expand Down Expand Up @@ -284,6 +287,7 @@ public void run() {
case NONE:
UpdateResponse update = result.action();
listener.onResponse(update);
indicesService.indexService(request.index()).shard(request.shardId()).indexingService().noopUpdate(request.type());
break;
default:
throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/elasticsearch/action/update/UpdateHelper.java
Expand Up @@ -140,7 +140,13 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
if (indexRequest.parent() != null) {
parent = indexRequest.parent();
}
XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap());
boolean noop = !XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap(), request.detectNoop());
// noop could still be true even if detectNoop isn't because update detects empty maps as noops. BUT we can only
// actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle
// cases where users repopulating multi-fields or adding synonyms, etc.
if (request.detectNoop() && noop) {
operation = "none";
}
} else {
Map<String, Object> ctx = new HashMap<>(2);
ctx.put("_source", sourceAndContent.v2());
Expand Down Expand Up @@ -196,7 +202,7 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.update;

import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private IndexRequest upsertRequest;

private boolean docAsUpsert = false;
private boolean detectNoop = false;

@Nullable
private IndexRequest doc;
Expand Down Expand Up @@ -560,6 +562,19 @@ public UpdateRequest source(byte[] source, int offset, int length) throws Except
return source(new BytesArray(source, offset, length));
}

/**
* Should this update attempt to detect if it is a noop?
* @return this for chaining
*/
public UpdateRequest detectNoop(boolean detectNoop) {
this.detectNoop = detectNoop;
return this;
}

public boolean detectNoop() {
return detectNoop;
}

public UpdateRequest source(BytesReference source) throws Exception {
XContentType xContentType = XContentFactory.xContentType(source);
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
Expand Down Expand Up @@ -587,6 +602,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
safeDoc().source(docBuilder);
} else if ("doc_as_upsert".equals(currentFieldName)) {
docAsUpsert(parser.booleanValue());
} else if ("detect_noop".equals(currentFieldName)) {
detectNoop(parser.booleanValue());
}
}
}
Expand Down Expand Up @@ -635,6 +652,9 @@ public void readFrom(StreamInput in) throws IOException {
docAsUpsert = in.readBoolean();
version = Versions.readVersion(in);
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
detectNoop = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -684,6 +704,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(docAsUpsert);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(detectNoop);
}
}

}
Expand Up @@ -392,6 +392,14 @@ public UpdateRequestBuilder setDocAsUpsert(boolean shouldUpsertDoc) {
return this;
}

/**
* Sets whether to perform extra effort to detect noop updates via docAsUpsert.
*/
public UpdateRequestBuilder setDetectNoop(boolean detectNoop) {
request.detectNoop(detectNoop);
return this;
}

@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
client.update(request, listener);
Expand Down
Expand Up @@ -184,22 +184,40 @@ public static String convertToJson(byte[] data, int offset, int length, boolean
/**
* Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source
* unless both are Maps, in which case it recuersively updated it.
* @param source the original map to be updated
* @param changes the changes to update into updated
* @param checkUpdatesAreUnequal should this method check if updates to the same key (that are not both maps) are
* unequal? This is just a .equals check on the objects, but that can take some time on long strings.
* @return true if the source map was modified
*/
public static void update(Map<String, Object> source, Map<String, Object> changes) {
public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to always return if the doc has been modified and then to check externally if we want to update a document that has not been modified?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought passing in the flag would be better because it allows us to avoid the equality check if we don't need it. I imagine it'd really only matter for long strings but I also imagine this method receives long strings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's a good reason for having such a parameter! Can you add a comment about that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely.

boolean modified = false;
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
} else {
if (source.get(changesEntry.getKey()) instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
update((Map<String, Object>) source.get(changesEntry.getKey()), (Map<String, Object>) changesEntry.getValue());
} else {
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
}
modified = true;
continue;
}
Object old = source.get(changesEntry.getKey());
if (old instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
modified |= update((Map<String, Object>) source.get(changesEntry.getKey()),
(Map<String, Object>) changesEntry.getValue(), checkUpdatesAreUnequal && !modified);
continue;
}
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
if (modified) {
continue;
}
if (!checkUpdatesAreUnequal || old == null) {
modified = true;
continue;
}
modified = !old.equals(changesEntry.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|= ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I caught when I did the weighting and it looks like reverting that reverted my catch. Or I'm just remembering things. Anyway, I'll fix it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just finishing up - this |= isn't required because we continue above if modified is true so we can be super duper lazy with the .equals method.

}
return modified;
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.indexing;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -46,17 +47,20 @@ public static class Stats implements Streamable, ToXContent {
private long deleteTimeInMillis;
private long deleteCurrent;

private long noopUpdateCount;

Stats() {

}

public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent) {
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
this.indexCurrent = indexCurrent;
this.deleteCount = deleteCount;
this.deleteTimeInMillis = deleteTimeInMillis;
this.deleteCurrent = deleteCurrent;
this.noopUpdateCount = noopUpdateCount;
}

public void add(Stats stats) {
Expand All @@ -67,6 +71,8 @@ public void add(Stats stats) {
deleteCount += stats.deleteCount;
deleteTimeInMillis += stats.deleteTimeInMillis;
deleteCurrent += stats.deleteCurrent;

noopUpdateCount += stats.noopUpdateCount;
}

public long getIndexCount() {
Expand Down Expand Up @@ -101,6 +107,10 @@ public long getDeleteCurrent() {
return deleteCurrent;
}

public long getNoopUpdateCount() {
return noopUpdateCount;
}

public static Stats readStats(StreamInput in) throws IOException {
Stats stats = new Stats();
stats.readFrom(in);
Expand All @@ -116,6 +126,10 @@ public void readFrom(StreamInput in) throws IOException {
deleteCount = in.readVLong();
deleteTimeInMillis = in.readVLong();
deleteCurrent = in.readVLong();

if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
noopUpdateCount = in.readVLong();
}
}

@Override
Expand All @@ -127,6 +141,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(deleteCount);
out.writeVLong(deleteTimeInMillis);
out.writeVLong(deleteCurrent);

if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeVLong(noopUpdateCount);
}
}

@Override
Expand All @@ -139,6 +157,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.timeValueField(Fields.DELETE_TIME_IN_MILLIS, Fields.DELETE_TIME, deleteTimeInMillis);
builder.field(Fields.DELETE_CURRENT, deleteCurrent);

builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount);

return builder;
}
}
Expand Down Expand Up @@ -218,6 +238,7 @@ static final class Fields {
static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time");
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total");
}

public static IndexingStats readIndexingStats(StreamInput in) throws IOException {
Expand Down
Expand Up @@ -218,6 +218,11 @@ public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
}
}

public void noopUpdate(String type) {
totalStats.noopUpdates.inc();
typeStats(type).noopUpdates.dec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean .inc ?

}

public void clear() {
totalStats.clear();
synchronized (this) {
Expand Down Expand Up @@ -253,11 +258,13 @@ static class StatsHolder {
public final MeanMetric deleteMetric = new MeanMetric();
public final CounterMetric indexCurrent = new CounterMetric();
public final CounterMetric deleteCurrent = new CounterMetric();
public final CounterMetric noopUpdates = new CounterMetric();

public IndexingStats.Stats stats() {
return new IndexingStats.Stats(
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(),
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count());
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count());
}

public long totalCurrent() {
Expand Down