Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ Improvements

* GITHUB#3666: Removing redundant check if field exists in TextToVectorUpdateProcessorFactory (Renato Haeberli via Alessandro Benedetti)

* SOLR-17926: Improve tracking of time already spent to discount the limit for sub-requests when `timeAllowed` is used. (Andrzej Bialecki, hossman)

Optimizations
---------------------
* SOLR-17568: The CLI bin/solr export tool now contacts the appropriate nodes directly for data instead of proxying through one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,14 +758,16 @@ protected void groupedFinishStage(final ResponseBuilder rb) {
protected void regularFinishStage(ResponseBuilder rb) {
// We may not have been able to retrieve all the docs due to an
// index change. Remove any null documents.
for (Iterator<SolrDocument> iter = rb.getResponseDocs().iterator(); iter.hasNext(); ) {
SolrDocumentList responseDocs =
rb.getResponseDocs() != null ? rb.getResponseDocs() : new SolrDocumentList();
for (Iterator<SolrDocument> iter = responseDocs.iterator(); iter.hasNext(); ) {
if (iter.next() == null) {
iter.remove();
rb.getResponseDocs().setNumFound(rb.getResponseDocs().getNumFound() - 1);
rb.getResponseDocs().setNumFound(responseDocs.getNumFound() - 1);
}
}

rb.rsp.addResponse(rb.getResponseDocs());
rb.rsp.addResponse(responseDocs);
if (null != rb.getNextCursorMark()) {
rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT, rb.getNextCursorMark().getSerializedTotem());
}
Expand Down Expand Up @@ -1339,6 +1341,9 @@ protected void createRetrieveDocs(ResponseBuilder rb) {

// for each shard, collect the documents for that shard.
HashMap<String, Collection<ShardDoc>> shardMap = new HashMap<>();
if (rb.resultIds == null) {
rb.resultIds = new HashMap<>();
}
for (ShardDoc sdoc : rb.resultIds.values()) {
Collection<ShardDoc> shardDocs = shardMap.get(sdoc.shard);
if (shardDocs == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,25 @@ public int getStage() {
return this.stage;
}

public String getStageName() {
switch (this.stage) {
case STAGE_START:
return "START";
case STAGE_PARSE_QUERY:
return "PARSE_QUERY";
case STAGE_TOP_GROUPS:
return "TOP_GROUPS";
case STAGE_EXECUTE_QUERY:
return "EXECUTE_QUERY";
case STAGE_GET_FIELDS:
return "GET_FIELDS";
case STAGE_DONE:
return "DONE";
default:
return Integer.toString(this.stage);
}
}

public void setStage(int stage) {
this.stage = stage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.CursorMark;
import org.apache.solr.search.QueryLimits;
import org.apache.solr.search.SortSpec;
import org.apache.solr.search.facet.FacetModule;
import org.apache.solr.security.AuthorizationContext;
Expand Down Expand Up @@ -524,7 +525,7 @@ private void processComponents(
}
}
} catch (ExitableDirectoryReader.ExitingReaderException ex) {
log.warn("Query: {}; ", req.getParamString(), ex);
log.warn("Query terminated: {}; ", req.getParamString(), ex);
shortCircuitedResults(req, rb);
}
} else {
Expand Down Expand Up @@ -563,6 +564,8 @@ private void processComponents(
// presume we'll get a response from each shard we send to
sreq.responses = new ArrayList<>(sreq.actualShards.length);

QueryLimits queryLimits = QueryLimits.getCurrentLimits();

// TODO: map from shard to address[]
for (String shard : sreq.actualShards) {
ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
Expand All @@ -586,6 +589,18 @@ private void processComponents(
params.set(CommonParams.QT, reqPath);
} // else if path is /select, then the qt gets passed thru if set
}
if (queryLimits.isLimitsEnabled()) {
if (queryLimits.adjustShardRequestLimits(sreq, shard, params, rb)) {
// Skip this shard since one or more limits will be tripped
if (log.isDebugEnabled()) {
log.debug(
"Skipping request to shard '{}' due to query limits, params {}",
shard,
params);
}
continue;
}
}
shardHandler1.submit(sreq, shard, params);
}
}
Expand Down
14 changes: 14 additions & 0 deletions solr/core/src/java/org/apache/solr/search/QueryLimit.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.solr.search;

import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.component.ShardRequest;

public interface QueryLimit extends QueryTimeout {
/**
Expand All @@ -28,4 +30,16 @@ public interface QueryLimit extends QueryTimeout {
* user.
*/
Object currentValue();

/**
* Allow limit to adjust shard request parameters if needed.
*
* @return true if the shard request should be skipped because a limit will be tripped after
* sending, during execution.
*/
default boolean adjustShardRequestLimit(
ShardRequest sreq, String shard, ModifiableSolrParams params) {
// default is to do nothing and process the request
return false;
}
}
84 changes: 69 additions & 15 deletions solr/core/src/java/org/apache/solr/search/QueryLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.apache.solr.response.SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY;
import static org.apache.solr.search.CpuAllowedLimit.hasCpuLimit;
import static org.apache.solr.search.MemAllowedLimit.hasMemLimit;
import static org.apache.solr.search.TimeAllowedLimit.hasTimeLimit;

import java.util.ArrayList;
Expand All @@ -27,6 +28,9 @@
import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
Expand Down Expand Up @@ -71,7 +75,7 @@ public QueryLimits(SolrQueryRequest req, SolrQueryResponse rsp) {
if (hasCpuLimit(req)) {
limits.add(new CpuAllowedLimit(req));
}
if (MemAllowedLimit.hasMemLimit(req)) {
if (hasMemLimit(req)) {
limits.add(new MemAllowedLimit(req));
}
}
Expand Down Expand Up @@ -105,6 +109,18 @@ public String formatExceptionMessage(String label) {
+ limitStatusMessage();
}

public String formatShardLimitExceptionMessage(String label, String shard, QueryLimit limit) {
return "Limit exceeded before sub-request!"
+ (label != null ? " (" + label + ")" : "")
+ ": ["
+ limit.getClass().getSimpleName()
+ ": "
+ limit.currentValue()
+ "] on shard "
+ shard
+ ". This shard request will be skipped.";
}

/**
* If limit is reached then depending on the request param {@link CommonParams#PARTIAL_RESULTS}
* either mark it as partial result in the response and signal the caller to return, or throw an
Expand All @@ -118,21 +134,21 @@ public String formatExceptionMessage(String label) {
public boolean maybeExitWithPartialResults(Supplier<String> label)
throws QueryLimitsExceededException {
if (isLimitsEnabled() && shouldExit()) {
if (allowPartialResults) {
if (rsp != null) {
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"No request active, but attempting to exit with partial results?");
}
rsp.setPartialResults(requestInfo.getReq());
if (rsp.getResponseHeader().get(RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY) == null) {
// don't want to add duplicate keys. Although technically legal, there's a strong risk
// that clients won't anticipate it and break.
rsp.addPartialResponseDetail(formatExceptionMessage(label.get()));
}
if (rsp != null) {
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"No request active, but attempting to exit with partial results?");
}
rsp.setPartialResults(requestInfo.getReq());
if (rsp.getResponseHeader().get(RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY) == null) {
// don't want to add duplicate keys. Although technically legal, there's a strong risk
// that clients won't anticipate it and break.
rsp.addPartialResponseDetail(formatExceptionMessage(label.get()));
}
}
if (allowPartialResults) {
return true;
} else {
throw new QueryLimitsExceededException(formatExceptionMessage(label.get()));
Expand Down Expand Up @@ -185,6 +201,44 @@ public boolean isLimitsEnabled() {
return !limits.isEmpty();
}

/**
* Allow each limit to adjust the shard request parameters if needed. This is useful for
* timeAllowed, cpu, or memory limits that need to be propagated to shards with potentially
* modified values.
*
* @return true if the shard request should be skipped because one or more limits would be tripped
* after sending, during execution.
* @throws QueryLimitsExceededException if {@link #allowPartialResults} is false and limits would
* have been tripped by the shard request.
*/
public boolean adjustShardRequestLimits(
ShardRequest sreq, String shard, ModifiableSolrParams params, ResponseBuilder rb)
throws QueryLimitsExceededException {
boolean result = false;
for (QueryLimit limit : limits) {
boolean shouldSkip = limit.adjustShardRequestLimit(sreq, shard, params);
if (shouldSkip) {
String label = "stage: " + rb.getStageName();
if (rsp != null) {
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"No request active, but attempting to exit with partial results?");
}
rsp.setPartialResults(requestInfo.getReq());
rsp.addPartialResponseDetail(formatShardLimitExceptionMessage(label, shard, limit));
}
if (!allowPartialResults) {
throw new QueryLimitsExceededException(
formatShardLimitExceptionMessage(label, shard, limit));
}
}
result = result || shouldSkip;
}
return result;
}

/**
* Helper method to retrieve the current QueryLimits from {@link SolrRequestInfo#getRequestInfo()}
* if it exists, otherwise it returns {@link #NONE}.
Expand Down
67 changes: 57 additions & 10 deletions solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,38 @@

import static java.lang.System.nanoTime;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enforces a wall clock based timeout on a given SolrQueryRequest. This class holds the logic for
* the {@code timeAllowed} query parameter. Note that timeAllowed will be ignored for
* <strong><em>local</em></strong> processing of sub-queries in cases where the parent query already
* has {@code timeAllowed} set. Essentially only one timeAllowed can be specified for any thread
* executing a query. This is to ensure that subqueries don't escape from the intended limit
* executing a query. This is to ensure that subqueries don't escape from the intended limit.
*
* <p>Distributed requests will approximately track the original starting point of the parent
* request. Shard requests may be skipped if the limit would run out shortly after they are sent -
* this in-flight allowance is determined by {@link #INFLIGHT_PARAM} in milliseconds, with the
* default value of {@link #DEFAULT_INFLIGHT_MS}.
*/
public class TimeAllowedLimit implements QueryLimit {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String USED_PARAM = "_taUsed";
public static final String INFLIGHT_PARAM = "timeAllowed.inflight";

/** Arbitrary small amount of time to account for network flight time in ms */
public static final long DEFAULT_INFLIGHT_MS = 2L;

private final long reqTimeAllowedMs;
private final long reqInflightMs;
private final long timeoutAt;
private final long timingSince;

Expand All @@ -43,19 +62,46 @@ public class TimeAllowedLimit implements QueryLimit {
* object
*/
public TimeAllowedLimit(SolrQueryRequest req) {
// reduce by time already spent
long reqTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);
// original timeAllowed in milliseconds
reqTimeAllowedMs = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);

if (reqTimeAllowed == -1L) {
if (reqTimeAllowedMs == -1L) {
throw new IllegalArgumentException(
"Check for limit with hasTimeLimit(req) before creating a TimeAllowedLimit");
}
long timeAlreadySpent = (long) req.getRequestTimer().getTime();
long now = nanoTime();
long timeAllowed = reqTimeAllowed - timeAlreadySpent;
long nanosAllowed = TimeUnit.NANOSECONDS.convert(timeAllowed, TimeUnit.MILLISECONDS);
timeoutAt = now + nanosAllowed;
timingSince = now - timeAlreadySpent;
// time already spent locally before this limit was initialized, in milliseconds
long timeAlreadySpentMs = (long) req.getRequestTimer().getTime();
long parentUsedMs = req.getParams().getLong(USED_PARAM, -1L);
reqInflightMs = req.getParams().getLong(INFLIGHT_PARAM, DEFAULT_INFLIGHT_MS);
if (parentUsedMs != -1L) {
// this is a sub-request of a request that already had timeAllowed set.
// We have to deduct the time already used by the parent request.
log.debug("parentUsedMs: {}", parentUsedMs);
timeAlreadySpentMs += parentUsedMs;
}
long nowNs = nanoTime();
long remainingTimeAllowedMs = reqTimeAllowedMs - timeAlreadySpentMs;
log.debug("remainingTimeAllowedMs: {}", remainingTimeAllowedMs);
long remainingTimeAllowedNs = TimeUnit.MILLISECONDS.toNanos(remainingTimeAllowedMs);
timeoutAt = nowNs + remainingTimeAllowedNs;
timingSince = nowNs - TimeUnit.MILLISECONDS.toNanos(timeAlreadySpentMs);
}

@Override
public boolean adjustShardRequestLimit(
ShardRequest sreq, String shard, ModifiableSolrParams params) {
long usedTimeAllowedMs = TimeUnit.NANOSECONDS.toMillis(nanoTime() - timingSince);
// increase by the expected in-flight time
usedTimeAllowedMs += reqInflightMs;
boolean result = false;
if (usedTimeAllowedMs >= reqTimeAllowedMs) {
// there's no point in sending this request to the shard because the time will run out
// before it's processed at the target
result = true;
}
params.set(USED_PARAM, Long.toString(usedTimeAllowedMs));
log.debug("adjustShardRequestLimit: used {} ms (skip? {})", usedTimeAllowedMs, result);
return result;
}

/** Return true if the current request has a parameter with a valid value of the limit. */
Expand All @@ -69,6 +115,7 @@ public boolean shouldExit() {
return timeoutAt - nanoTime() < 0L;
}

/** Return elapsed time in nanoseconds since this limit was started. */
@Override
public Object currentValue() {
return nanoTime() - timingSince;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,6 @@ private ShardRequest[] createRequest(ResponseBuilder rb, String[] shards) {
sreq.params.set(CommonParams.FL, schema.getUniqueKeyField().getName());
}

int origTimeAllowed = sreq.params.getInt(CommonParams.TIME_ALLOWED, -1);
if (origTimeAllowed > 0) {
sreq.params.set(
CommonParams.TIME_ALLOWED, Math.max(1, origTimeAllowed - rb.firstPhaseElapsedTime));
}

return new ShardRequest[] {sreq};
}
}
Loading