diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java index 76fee0e55fb83..105fd76399d80 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.deletebyquery; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; import org.elasticsearch.common.Nullable; @@ -45,8 +46,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< private Set routing; @Nullable private String[] filteringAliases; + private long nowInMillis; - IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set routing, @Nullable String[] filteringAliases) { + IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set routing, @Nullable String[] filteringAliases, + long nowInMillis + ) { this.index = index; this.timeout = request.timeout(); this.source = request.source(); @@ -55,6 +59,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< this.consistencyLevel = request.consistencyLevel(); this.routing = routing; this.filteringAliases = filteringAliases; + this.nowInMillis = nowInMillis; } IndexDeleteByQueryRequest() { @@ -85,6 +90,10 @@ String[] filteringAliases() { return filteringAliases; } + long nowInMillis() { + return nowInMillis; + } + public IndexDeleteByQueryRequest timeout(TimeValue timeout) { this.timeout = timeout; return this; @@ -114,6 +123,11 @@ public void readFrom(StreamInput in) throws IOException { filteringAliases[i] = in.readString(); } } + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + nowInMillis = in.readVLong(); + } else { + nowInMillis = System.currentTimeMillis(); + } } public void writeTo(StreamOutput out) throws IOException { @@ -139,5 +153,8 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeVInt(0); } + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeVLong(nowInMillis); + } } } diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java index 7d42bc5389b0f..d3bcba009c870 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.deletebyquery; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; @@ -47,6 +48,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< private Set routing; @Nullable private String[] filteringAliases; + private long nowInMillis; ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) { super(request); @@ -59,6 +61,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< timeout = request.timeout(); this.routing = request.routing(); filteringAliases = request.filteringAliases(); + nowInMillis = request.nowInMillis(); } ShardDeleteByQueryRequest() { @@ -93,6 +96,10 @@ public String[] filteringAliases() { return filteringAliases; } + long nowInMillis() { + return nowInMillis; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -113,6 +120,12 @@ public void readFrom(StreamInput in) throws IOException { filteringAliases[i] = in.readString(); } } + + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + nowInMillis = in.readVLong(); + } else { + nowInMillis = System.currentTimeMillis(); + } } @Override @@ -137,6 +150,9 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeVInt(0); } + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeVLong(nowInMillis); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 9be61c5a75f9d..ee6eacaaef226 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -100,8 +100,8 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteByQu } @Override - protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set routing) { + protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set routing, long startTimeInMillis) { String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices()); - return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases); + return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases, startTimeInMillis); } } diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 4dddb54987324..1092c0387c50a 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -115,7 +115,7 @@ protected PrimaryResponse IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { @@ -138,7 +138,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index 5c0dd26659978..53ea0522314a9 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -84,6 +84,7 @@ protected void doExecute(final Request request, final ActionListener l final AtomicInteger indexCounter = new AtomicInteger(); final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length); final AtomicReferenceArray indexResponses = new AtomicReferenceArray(concreteIndices.length); + final long startTimeInMillis = System.currentTimeMillis(); Map> routingMap = resolveRouting(clusterState, request); if (concreteIndices == null || concreteIndices.length == 0) { @@ -94,7 +95,7 @@ protected void doExecute(final Request request, final ActionListener l if (routingMap != null) { routing = routingMap.get(index); } - IndexRequest indexRequest = newIndexRequestInstance(request, index, routing); + IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis); // no threading needed, all is done on the index replication one indexRequest.listenerThreaded(false); indexAction.execute(indexRequest, new ActionListener() { @@ -127,7 +128,7 @@ public void onFailure(Throwable e) { protected abstract String transportAction(); - protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set routing); + protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set routing, long startTimeInMillis); protected abstract boolean accumulateExceptions(); diff --git a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java index f98a68b43bce9..fd39fdc53fc6b 100644 --- a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java +++ b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java @@ -142,4 +142,16 @@ public void testDeleteByFieldQuery() throws Exception { } + @Test + public void testDateMath() throws Exception { + index("test", "type", "1", "d", "2013-01-01"); + ensureGreen(); + refresh(); + assertHitCount(client().prepareCount("test").get(), 1); + client().prepareDeleteByQuery("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")).get(); + refresh(); + assertHitCount(client().prepareCount("test").get(), 0); + } + + }