Skip to content

Commit

Permalink
Capture and set start time in Delete By Query operations
Browse files Browse the repository at this point in the history
This is important for queries/filters that use `now` in date based queries/filters

Closes #5540
  • Loading branch information
bleskes committed Mar 26, 2014
1 parent ab3e22d commit 196e3c3
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 7 deletions.
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
Expand All @@ -45,8 +46,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
private Set<String> routing;
@Nullable
private String[] filteringAliases;
private long nowInMillis;

IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases) {
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
long nowInMillis
) {
this.index = index;
this.timeout = request.timeout();
this.source = request.source();
Expand All @@ -55,6 +59,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
this.consistencyLevel = request.consistencyLevel();
this.routing = routing;
this.filteringAliases = filteringAliases;
this.nowInMillis = nowInMillis;
}

IndexDeleteByQueryRequest() {
Expand Down Expand Up @@ -85,6 +90,10 @@ String[] filteringAliases() {
return filteringAliases;
}

long nowInMillis() {
return nowInMillis;
}

public IndexDeleteByQueryRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
Expand Down Expand Up @@ -114,6 +123,11 @@ public void readFrom(StreamInput in) throws IOException {
filteringAliases[i] = in.readString();
}
}
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
nowInMillis = in.readVLong();
} else {
nowInMillis = System.currentTimeMillis();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -139,5 +153,8 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
}
}
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
private Set<String> routing;
@Nullable
private String[] filteringAliases;
private long nowInMillis;

ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
super(request);
Expand All @@ -59,6 +61,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
timeout = request.timeout();
this.routing = request.routing();
filteringAliases = request.filteringAliases();
nowInMillis = request.nowInMillis();
}

ShardDeleteByQueryRequest() {
Expand Down Expand Up @@ -93,6 +96,10 @@ public String[] filteringAliases() {
return filteringAliases;
}

long nowInMillis() {
return nowInMillis;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -113,6 +120,12 @@ public void readFrom(StreamInput in) throws IOException {
filteringAliases[i] = in.readString();
}
}

if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
nowInMillis = in.readVLong();
} else {
nowInMillis = System.currentTimeMillis();
}
}

@Override
Expand All @@ -137,6 +150,9 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
}

@Override
Expand Down
Expand Up @@ -100,8 +100,8 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteByQu
}

@Override
protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing) {
protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing, long startTimeInMillis) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices());
return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases);
return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases, startTimeInMillis);
}
}
Expand Up @@ -115,7 +115,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays));
try {
Expand All @@ -138,7 +138,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays));
try {
Expand Down
Expand Up @@ -84,6 +84,7 @@ protected void doExecute(final Request request, final ActionListener<Response> l
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(concreteIndices.length);
final long startTimeInMillis = System.currentTimeMillis();

Map<String, Set<String>> routingMap = resolveRouting(clusterState, request);
if (concreteIndices == null || concreteIndices.length == 0) {
Expand All @@ -94,7 +95,7 @@ protected void doExecute(final Request request, final ActionListener<Response> l
if (routingMap != null) {
routing = routingMap.get(index);
}
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing);
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis);
// no threading needed, all is done on the index replication one
indexRequest.listenerThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
Expand Down Expand Up @@ -127,7 +128,7 @@ public void onFailure(Throwable e) {

protected abstract String transportAction();

protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing);
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);

protected abstract boolean accumulateExceptions();

Expand Down
Expand Up @@ -142,4 +142,16 @@ public void testDeleteByFieldQuery() throws Exception {

}

@Test
public void testDateMath() throws Exception {
index("test", "type", "1", "d", "2013-01-01");
ensureGreen();
refresh();
assertHitCount(client().prepareCount("test").get(), 1);
client().prepareDeleteByQuery("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")).get();
refresh();
assertHitCount(client().prepareCount("test").get(), 0);
}


}

0 comments on commit 196e3c3

Please sign in to comment.