Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture and set start time in Delete By Query operations #5540

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
Expand All @@ -45,8 +46,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
private Set<String> routing;
@Nullable
private String[] filteringAliases;
private long nowInMillis;

IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases) {
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
long nowInMillis
) {
this.index = index;
this.timeout = request.timeout();
this.source = request.source();
Expand All @@ -55,6 +59,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
this.consistencyLevel = request.consistencyLevel();
this.routing = routing;
this.filteringAliases = filteringAliases;
this.nowInMillis = nowInMillis;
}

IndexDeleteByQueryRequest() {
Expand Down Expand Up @@ -85,6 +90,10 @@ String[] filteringAliases() {
return filteringAliases;
}

long nowInMillis() {
return nowInMillis;
}

public IndexDeleteByQueryRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
Expand Down Expand Up @@ -114,6 +123,11 @@ public void readFrom(StreamInput in) throws IOException {
filteringAliases[i] = in.readString();
}
}
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
nowInMillis = in.readVLong();
} else {
nowInMillis = System.currentTimeMillis();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -139,5 +153,8 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
}
}
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
private Set<String> routing;
@Nullable
private String[] filteringAliases;
private long nowInMillis;

ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
super(request);
Expand All @@ -59,6 +61,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
timeout = request.timeout();
this.routing = request.routing();
filteringAliases = request.filteringAliases();
nowInMillis = request.nowInMillis();
}

ShardDeleteByQueryRequest() {
Expand Down Expand Up @@ -93,6 +96,10 @@ public String[] filteringAliases() {
return filteringAliases;
}

long nowInMillis() {
return nowInMillis;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -113,6 +120,12 @@ public void readFrom(StreamInput in) throws IOException {
filteringAliases[i] = in.readString();
}
}

if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
nowInMillis = in.readVLong();
} else {
nowInMillis = System.currentTimeMillis();
}
}

@Override
Expand All @@ -137,6 +150,9 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
}

@Override
Expand Down
Expand Up @@ -100,8 +100,8 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteByQu
}

@Override
protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing) {
protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing, long startTimeInMillis) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices());
return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases);
return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases, startTimeInMillis);
}
}
Expand Up @@ -115,7 +115,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays));
try {
Expand All @@ -138,7 +138,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays));
try {
Expand Down
Expand Up @@ -84,6 +84,7 @@ protected void doExecute(final Request request, final ActionListener<Response> l
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(concreteIndices.length);
final long startTimeInMillis = System.currentTimeMillis();

Map<String, Set<String>> routingMap = resolveRouting(clusterState, request);
if (concreteIndices == null || concreteIndices.length == 0) {
Expand All @@ -94,7 +95,7 @@ protected void doExecute(final Request request, final ActionListener<Response> l
if (routingMap != null) {
routing = routingMap.get(index);
}
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing);
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis);
// no threading needed, all is done on the index replication one
indexRequest.listenerThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
Expand Down Expand Up @@ -127,7 +128,7 @@ public void onFailure(Throwable e) {

protected abstract String transportAction();

protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing);
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);

protected abstract boolean accumulateExceptions();

Expand Down
Expand Up @@ -142,4 +142,16 @@ public void testDeleteByFieldQuery() throws Exception {

}

@Test
public void testDateMath() throws Exception {
index("test", "type", "1", "d", "2013-01-01");
ensureGreen();
refresh();
assertHitCount(client().prepareCount("test").get(), 1);
client().prepareDeleteByQuery("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")).get();
refresh();
assertHitCount(client().prepareCount("test").get(), 0);
}


}