diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0584672fa1c..5f8bd2771d5 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -63,6 +63,8 @@ Improvements * GITHUB#3666: Removing redundant check if field exists in TextToVectorUpdateProcessorFactory (Renato Haeberli via Alessandro Benedetti) +* SOLR-17926: Improve tracking of time already spent to discount the limit for sub-requests when `timeAllowed` is used. (Andrzej Bialecki, hossman) + Optimizations --------------------- * SOLR-17568: The CLI bin/solr export tool now contacts the appropriate nodes directly for data instead of proxying through one. diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java index 610c057a942..9deb2f9f4d0 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java @@ -758,14 +758,16 @@ protected void groupedFinishStage(final ResponseBuilder rb) { protected void regularFinishStage(ResponseBuilder rb) { // We may not have been able to retrieve all the docs due to an // index change. Remove any null documents. - for (Iterator iter = rb.getResponseDocs().iterator(); iter.hasNext(); ) { + SolrDocumentList responseDocs = + rb.getResponseDocs() != null ? rb.getResponseDocs() : new SolrDocumentList(); + for (Iterator iter = responseDocs.iterator(); iter.hasNext(); ) { if (iter.next() == null) { iter.remove(); - rb.getResponseDocs().setNumFound(rb.getResponseDocs().getNumFound() - 1); + rb.getResponseDocs().setNumFound(responseDocs.getNumFound() - 1); } } - rb.rsp.addResponse(rb.getResponseDocs()); + rb.rsp.addResponse(responseDocs); if (null != rb.getNextCursorMark()) { rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT, rb.getNextCursorMark().getSerializedTotem()); } @@ -1339,6 +1341,9 @@ protected void createRetrieveDocs(ResponseBuilder rb) { // for each shard, collect the documents for that shard. HashMap> shardMap = new HashMap<>(); + if (rb.resultIds == null) { + rb.resultIds = new HashMap<>(); + } for (ShardDoc sdoc : rb.resultIds.values()) { Collection shardDocs = shardMap.get(sdoc.shard); if (shardDocs == null) { diff --git a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java index 431c11197b9..c759ae3204a 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java +++ b/solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java @@ -137,6 +137,25 @@ public int getStage() { return this.stage; } + public String getStageName() { + switch (this.stage) { + case STAGE_START: + return "START"; + case STAGE_PARSE_QUERY: + return "PARSE_QUERY"; + case STAGE_TOP_GROUPS: + return "TOP_GROUPS"; + case STAGE_EXECUTE_QUERY: + return "EXECUTE_QUERY"; + case STAGE_GET_FIELDS: + return "GET_FIELDS"; + case STAGE_DONE: + return "DONE"; + default: + return Integer.toString(this.stage); + } + } + public void setStage(int stage) { this.stage = stage; } diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java index ed2e71e632a..36437d68eb9 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java @@ -72,6 +72,7 @@ import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.search.CursorMark; +import org.apache.solr.search.QueryLimits; import org.apache.solr.search.SortSpec; import org.apache.solr.search.facet.FacetModule; import org.apache.solr.security.AuthorizationContext; @@ -524,7 +525,7 @@ private void processComponents( } } } catch (ExitableDirectoryReader.ExitingReaderException ex) { - log.warn("Query: {}; ", req.getParamString(), ex); + log.warn("Query terminated: {}; ", req.getParamString(), ex); shortCircuitedResults(req, rb); } } else { @@ -563,6 +564,8 @@ private void processComponents( // presume we'll get a response from each shard we send to sreq.responses = new ArrayList<>(sreq.actualShards.length); + QueryLimits queryLimits = QueryLimits.getCurrentLimits(); + // TODO: map from shard to address[] for (String shard : sreq.actualShards) { ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); @@ -586,6 +589,18 @@ private void processComponents( params.set(CommonParams.QT, reqPath); } // else if path is /select, then the qt gets passed thru if set } + if (queryLimits.isLimitsEnabled()) { + if (queryLimits.adjustShardRequestLimits(sreq, shard, params, rb)) { + // Skip this shard since one or more limits will be tripped + if (log.isDebugEnabled()) { + log.debug( + "Skipping request to shard '{}' due to query limits, params {}", + shard, + params); + } + continue; + } + } shardHandler1.submit(sreq, shard, params); } } diff --git a/solr/core/src/java/org/apache/solr/search/QueryLimit.java b/solr/core/src/java/org/apache/solr/search/QueryLimit.java index 1043707f06b..a72c466783f 100644 --- a/solr/core/src/java/org/apache/solr/search/QueryLimit.java +++ b/solr/core/src/java/org/apache/solr/search/QueryLimit.java @@ -17,6 +17,8 @@ package org.apache.solr.search; import org.apache.lucene.index.QueryTimeout; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.component.ShardRequest; public interface QueryLimit extends QueryTimeout { /** @@ -28,4 +30,16 @@ public interface QueryLimit extends QueryTimeout { * user. */ Object currentValue(); + + /** + * Allow limit to adjust shard request parameters if needed. + * + * @return true if the shard request should be skipped because a limit will be tripped after + * sending, during execution. + */ + default boolean adjustShardRequestLimit( + ShardRequest sreq, String shard, ModifiableSolrParams params) { + // default is to do nothing and process the request + return false; + } } diff --git a/solr/core/src/java/org/apache/solr/search/QueryLimits.java b/solr/core/src/java/org/apache/solr/search/QueryLimits.java index 60d27f06128..35ea4533e93 100644 --- a/solr/core/src/java/org/apache/solr/search/QueryLimits.java +++ b/solr/core/src/java/org/apache/solr/search/QueryLimits.java @@ -18,6 +18,7 @@ import static org.apache.solr.response.SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY; import static org.apache.solr.search.CpuAllowedLimit.hasCpuLimit; +import static org.apache.solr.search.MemAllowedLimit.hasMemLimit; import static org.apache.solr.search.TimeAllowedLimit.hasTimeLimit; import java.util.ArrayList; @@ -27,6 +28,9 @@ import org.apache.lucene.index.QueryTimeout; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.component.ResponseBuilder; +import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.response.SolrQueryResponse; @@ -71,7 +75,7 @@ public QueryLimits(SolrQueryRequest req, SolrQueryResponse rsp) { if (hasCpuLimit(req)) { limits.add(new CpuAllowedLimit(req)); } - if (MemAllowedLimit.hasMemLimit(req)) { + if (hasMemLimit(req)) { limits.add(new MemAllowedLimit(req)); } } @@ -105,6 +109,18 @@ public String formatExceptionMessage(String label) { + limitStatusMessage(); } + public String formatShardLimitExceptionMessage(String label, String shard, QueryLimit limit) { + return "Limit exceeded before sub-request!" + + (label != null ? " (" + label + ")" : "") + + ": [" + + limit.getClass().getSimpleName() + + ": " + + limit.currentValue() + + "] on shard " + + shard + + ". This shard request will be skipped."; + } + /** * If limit is reached then depending on the request param {@link CommonParams#PARTIAL_RESULTS} * either mark it as partial result in the response and signal the caller to return, or throw an @@ -118,21 +134,21 @@ public String formatExceptionMessage(String label) { public boolean maybeExitWithPartialResults(Supplier label) throws QueryLimitsExceededException { if (isLimitsEnabled() && shouldExit()) { - if (allowPartialResults) { - if (rsp != null) { - SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); - if (requestInfo == null) { - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - "No request active, but attempting to exit with partial results?"); - } - rsp.setPartialResults(requestInfo.getReq()); - if (rsp.getResponseHeader().get(RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY) == null) { - // don't want to add duplicate keys. Although technically legal, there's a strong risk - // that clients won't anticipate it and break. - rsp.addPartialResponseDetail(formatExceptionMessage(label.get())); - } + if (rsp != null) { + SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); + if (requestInfo == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "No request active, but attempting to exit with partial results?"); } + rsp.setPartialResults(requestInfo.getReq()); + if (rsp.getResponseHeader().get(RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY) == null) { + // don't want to add duplicate keys. Although technically legal, there's a strong risk + // that clients won't anticipate it and break. + rsp.addPartialResponseDetail(formatExceptionMessage(label.get())); + } + } + if (allowPartialResults) { return true; } else { throw new QueryLimitsExceededException(formatExceptionMessage(label.get())); @@ -185,6 +201,44 @@ public boolean isLimitsEnabled() { return !limits.isEmpty(); } + /** + * Allow each limit to adjust the shard request parameters if needed. This is useful for + * timeAllowed, cpu, or memory limits that need to be propagated to shards with potentially + * modified values. + * + * @return true if the shard request should be skipped because one or more limits would be tripped + * after sending, during execution. + * @throws QueryLimitsExceededException if {@link #allowPartialResults} is false and limits would + * have been tripped by the shard request. + */ + public boolean adjustShardRequestLimits( + ShardRequest sreq, String shard, ModifiableSolrParams params, ResponseBuilder rb) + throws QueryLimitsExceededException { + boolean result = false; + for (QueryLimit limit : limits) { + boolean shouldSkip = limit.adjustShardRequestLimit(sreq, shard, params); + if (shouldSkip) { + String label = "stage: " + rb.getStageName(); + if (rsp != null) { + SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); + if (requestInfo == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "No request active, but attempting to exit with partial results?"); + } + rsp.setPartialResults(requestInfo.getReq()); + rsp.addPartialResponseDetail(formatShardLimitExceptionMessage(label, shard, limit)); + } + if (!allowPartialResults) { + throw new QueryLimitsExceededException( + formatShardLimitExceptionMessage(label, shard, limit)); + } + } + result = result || shouldSkip; + } + return result; + } + /** * Helper method to retrieve the current QueryLimits from {@link SolrRequestInfo#getRequestInfo()} * if it exists, otherwise it returns {@link #NONE}. diff --git a/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java b/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java index f837712176c..fb54d501d64 100644 --- a/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java +++ b/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java @@ -18,19 +18,38 @@ import static java.lang.System.nanoTime; +import java.lang.invoke.MethodHandles; import java.util.concurrent.TimeUnit; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.request.SolrQueryRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Enforces a wall clock based timeout on a given SolrQueryRequest. This class holds the logic for * the {@code timeAllowed} query parameter. Note that timeAllowed will be ignored for * local processing of sub-queries in cases where the parent query already * has {@code timeAllowed} set. Essentially only one timeAllowed can be specified for any thread - * executing a query. This is to ensure that subqueries don't escape from the intended limit + * executing a query. This is to ensure that subqueries don't escape from the intended limit. + * + *

Distributed requests will approximately track the original starting point of the parent + * request. Shard requests may be skipped if the limit would run out shortly after they are sent - + * this in-flight allowance is determined by {@link #INFLIGHT_PARAM} in milliseconds, with the + * default value of {@link #DEFAULT_INFLIGHT_MS}. */ public class TimeAllowedLimit implements QueryLimit { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String USED_PARAM = "_taUsed"; + public static final String INFLIGHT_PARAM = "timeAllowed.inflight"; + /** Arbitrary small amount of time to account for network flight time in ms */ + public static final long DEFAULT_INFLIGHT_MS = 2L; + + private final long reqTimeAllowedMs; + private final long reqInflightMs; private final long timeoutAt; private final long timingSince; @@ -43,19 +62,46 @@ public class TimeAllowedLimit implements QueryLimit { * object */ public TimeAllowedLimit(SolrQueryRequest req) { - // reduce by time already spent - long reqTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L); + // original timeAllowed in milliseconds + reqTimeAllowedMs = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L); - if (reqTimeAllowed == -1L) { + if (reqTimeAllowedMs == -1L) { throw new IllegalArgumentException( "Check for limit with hasTimeLimit(req) before creating a TimeAllowedLimit"); } - long timeAlreadySpent = (long) req.getRequestTimer().getTime(); - long now = nanoTime(); - long timeAllowed = reqTimeAllowed - timeAlreadySpent; - long nanosAllowed = TimeUnit.NANOSECONDS.convert(timeAllowed, TimeUnit.MILLISECONDS); - timeoutAt = now + nanosAllowed; - timingSince = now - timeAlreadySpent; + // time already spent locally before this limit was initialized, in milliseconds + long timeAlreadySpentMs = (long) req.getRequestTimer().getTime(); + long parentUsedMs = req.getParams().getLong(USED_PARAM, -1L); + reqInflightMs = req.getParams().getLong(INFLIGHT_PARAM, DEFAULT_INFLIGHT_MS); + if (parentUsedMs != -1L) { + // this is a sub-request of a request that already had timeAllowed set. + // We have to deduct the time already used by the parent request. + log.debug("parentUsedMs: {}", parentUsedMs); + timeAlreadySpentMs += parentUsedMs; + } + long nowNs = nanoTime(); + long remainingTimeAllowedMs = reqTimeAllowedMs - timeAlreadySpentMs; + log.debug("remainingTimeAllowedMs: {}", remainingTimeAllowedMs); + long remainingTimeAllowedNs = TimeUnit.MILLISECONDS.toNanos(remainingTimeAllowedMs); + timeoutAt = nowNs + remainingTimeAllowedNs; + timingSince = nowNs - TimeUnit.MILLISECONDS.toNanos(timeAlreadySpentMs); + } + + @Override + public boolean adjustShardRequestLimit( + ShardRequest sreq, String shard, ModifiableSolrParams params) { + long usedTimeAllowedMs = TimeUnit.NANOSECONDS.toMillis(nanoTime() - timingSince); + // increase by the expected in-flight time + usedTimeAllowedMs += reqInflightMs; + boolean result = false; + if (usedTimeAllowedMs >= reqTimeAllowedMs) { + // there's no point in sending this request to the shard because the time will run out + // before it's processed at the target + result = true; + } + params.set(USED_PARAM, Long.toString(usedTimeAllowedMs)); + log.debug("adjustShardRequestLimit: used {} ms (skip? {})", usedTimeAllowedMs, result); + return result; } /** Return true if the current request has a parameter with a valid value of the limit. */ @@ -69,6 +115,7 @@ public boolean shouldExit() { return timeoutAt - nanoTime() < 0L; } + /** Return elapsed time in nanoseconds since this limit was started. */ @Override public Object currentValue() { return nanoTime() - timingSince; diff --git a/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java index 22127f76dfc..21aa2494e6d 100644 --- a/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java +++ b/solr/core/src/java/org/apache/solr/search/grouping/distributed/requestfactory/TopGroupsShardRequestFactory.java @@ -132,12 +132,6 @@ private ShardRequest[] createRequest(ResponseBuilder rb, String[] shards) { sreq.params.set(CommonParams.FL, schema.getUniqueKeyField().getName()); } - int origTimeAllowed = sreq.params.getInt(CommonParams.TIME_ALLOWED, -1); - if (origTimeAllowed > 0) { - sreq.params.set( - CommonParams.TIME_ALLOWED, Math.max(1, origTimeAllowed - rb.firstPhaseElapsedTime)); - } - return new ShardRequest[] {sreq}; } } diff --git a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java index decc52da7a7..690b05e3f21 100644 --- a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java +++ b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java @@ -22,8 +22,11 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.util.TestInjection; import org.apache.solr.util.ThreadCpuTimer; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -34,7 +37,7 @@ public class TestQueryLimits extends SolrCloudTestCase { @BeforeClass public static void setupCluster() throws Exception { System.setProperty(ThreadCpuTimer.ENABLE_CPU_TIME, "true"); - configureCluster(1).addConfig("conf", configset("cloud-minimal")).configure(); + configureCluster(4).addConfig("conf", configset("exitable-directory")).configure(); SolrClient solrClient = cluster.getSolrClient(); CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2); @@ -54,6 +57,11 @@ public static void setupCluster() throws Exception { solrClient.commit(COLLECTION); } + @After + public void teardown() { + TestInjection.queryTimeout = null; + } + // TODO: add more tests and better assertions once SOLR-17151 / SOLR-17158 is done @Test public void testQueryLimits() throws Exception { @@ -98,4 +106,53 @@ public void testQueryLimits() throws Exception { assertTrue("call count should be > 0", callCounts.get(matchingExpr) > 0); } } + + @Test + public void testAdjustShardRequestLimits() throws Exception { + SolrClient solrClient = cluster.getSolrClient(); + String timeAllowed = "500"; // ms + ModifiableSolrParams params = + params( + "q", + "id:*", + "cache", + "false", + "group", + "true", + "group.field", + "val_i", + "timeAllowed", + timeAllowed, + "sleep", + "100"); + QueryResponse rsp = solrClient.query(COLLECTION, params); + assertNull("should have full results: " + rsp.jsonStr(), rsp.getHeader().get("partialResults")); + + // reduce timeAllowed to force partial results + params.set("timeAllowed", "100"); + // pretend this is a request with some time already used + params.set(TimeAllowedLimit.USED_PARAM, "60"); + // set a high skew to trigger skipping shard requests + params.set(TimeAllowedLimit.INFLIGHT_PARAM, "50"); + QueryResponse rsp1 = solrClient.query(COLLECTION, params); + assertNotNull( + "should have partial results: " + rsp1.jsonStr(), rsp1.getHeader().get("partialResults")); + assertEquals( + "partialResults should be true", "true", rsp1.getHeader().get("partialResults").toString()); + assertTrue( + "partialResultsDetails should contain 'skipped':" + rsp1.jsonStr(), + rsp1.getHeader().get("partialResultsDetails").toString().contains("skipped")); + + params.set(CommonParams.PARTIAL_RESULTS, false); + QueryResponse rsp2 = solrClient.query(COLLECTION, params); + assertNotNull( + "should have partial results: " + rsp2.jsonStr(), rsp2.getHeader().get("partialResults")); + assertEquals( + "partialResults should be omitted: " + rsp2.jsonStr(), + "omitted", + rsp2.getHeader().get("partialResults").toString()); + assertTrue( + "partialResultsDetails should contain 'skipped': " + rsp2.jsonStr(), + rsp2.getHeader().get("partialResultsDetails").toString().contains("skipped")); + } } diff --git a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc index 8809229fbb3..49c418fd749 100644 --- a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc +++ b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc @@ -382,6 +382,10 @@ As this check is periodically performed, the actual time for which a request can If the request consumes more time in other stages, custom components, etc., this parameter is not expected to abort the request. Regular search and the JSON Facet component abandon requests in accordance with this parameter. +When `timeAllowed` is specified in a distributed and/or multi-phase search, sub-requests will approximately track the same +wall clock starting point as the original request. Shard requests may be skipped if the limit would run out shortly after they are +sent - this in-flight allowance can be set with the `timeAllowed.inflight` parameter in milliseconds (default is 2ms). + == cpuAllowed Parameter [%autowidth,frame=none]