Skip to content

Commit

Permalink
SOLR-12217: Support shards.preference in SolrJ for individual shard r…
Browse files Browse the repository at this point in the history
…equests (#984)
  • Loading branch information
HoustonPutman committed Dec 9, 2019
1 parent 1eaa5b5 commit 1c78d2c
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 68 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Expand Up @@ -134,6 +134,8 @@ New Features

* SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N)

* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe)

Improvements
---------------------

Expand Down
22 changes: 22 additions & 0 deletions solr/core/src/java/org/apache/solr/handler/StreamHandler.java
Expand Up @@ -42,7 +42,10 @@
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
Expand Down Expand Up @@ -162,10 +165,29 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
return;
}


final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
if (zkController != null) {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
zkController.getZkStateReader().getClusterProperties()
.getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
.toString(),
zkController.getNodeName(),
zkController.getBaseUrl(),
zkController.getSysPropsCacher()
);
} else {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
}

int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
boolean local = params.getBool("streamLocalOnly", false);
StreamContext context = new StreamContext();
context.setRequestParams(params);
context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;
Expand Down
2 changes: 1 addition & 1 deletion solr/solr-ref-guide/src/distributed-requests.adoc
Expand Up @@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t

The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc.

IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported.
IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients.

The properties that can be specified are as follows:

Expand Down
10 changes: 9 additions & 1 deletion solr/solr-ref-guide/src/streaming-expressions.adoc
Expand Up @@ -123,6 +123,15 @@ for the entire expression, it may be faster for the client to send the expressio
`&streamLocalOnly=true` and handle merging of the results (if required) locally. This is an advanced option, relying
on a convenient organization of the index, and should only be considered if normal usage poses a performance issue.

=== Request Routing

Streaming Expressions respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>> for any call to Solr.

The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used.
- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`)
- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`)
- Set as a default in the Cluster properties.

=== Adding Custom Expressions

Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface. To add a custom expression to the
Expand All @@ -132,7 +141,6 @@ list of known mappings for the `/stream` handler, you just need to declare it as
<expressible name="custom" class="org.example.CustomStreamingExpression"/>



== Types of Streaming Expressions

=== About Stream Sources
Expand Down
8 changes: 8 additions & 0 deletions solr/solr-ref-guide/src/using-solrj.adoc
Expand Up @@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc

When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.

=== Cloud Request Routing

The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards.
If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly.

For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first.

== Querying in SolrJ
`SolrClient` has a number of `query()` methods for fetching results from Solr. Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters. And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata.

Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
Expand All @@ -55,6 +56,8 @@
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
Expand All @@ -69,7 +72,6 @@
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
Expand Down Expand Up @@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {

private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
boolean parallelUpdates; //TODO final
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
Expand Down Expand Up @@ -221,6 +224,7 @@ protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates,
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
}

/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
Expand Down Expand Up @@ -467,6 +471,8 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
} else {
params = new ModifiableSolrParams();
}

if (collection == null) {
Expand All @@ -492,10 +498,12 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
return null;
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);

//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
final Map<String,List<String>> urlMap = buildUrlMap(col);
final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
if (routes == null) {
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
Expand Down Expand Up @@ -616,12 +624,12 @@ protected RouteException getRouteException(SolrException.ErrorCode serverError,
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
}

private Map<String,List<String>> buildUrlMap(DocCollection col) {
private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
for (Slice slice : slices) {
String name = slice.getName();
List<String> urls = new ArrayList<>();
List<Replica> sortedReplicas = new ArrayList<>();
Replica leader = slice.getLeader();
if (directUpdatesToLeadersOnly && leader == null) {
for (Replica replica : slice.getReplicas(
Expand All @@ -638,20 +646,22 @@ private Map<String,List<String>> buildUrlMap(DocCollection col) {
// take unoptimized general path - we cannot find a leader yet
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);

if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getCoreUrl();
urls.add(url1);
if (!replica.equals(leader)) {
sortedReplicas.add(replica);
}
}
}
urlMap.put(name, urls);

// Sort the non-leader replicas according to the request parameters
replicaListTransformer.transform(sortedReplicas);

// put the leaderUrl first.
sortedReplicas.add(0, leader);

urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
}
return urlMap;
}
Expand Down Expand Up @@ -1046,6 +1056,8 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
reqParams = new ModifiableSolrParams();
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);

final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();

final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
Expand Down Expand Up @@ -1087,34 +1099,38 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
}

// Gather URLs, grouped by leader or replica
// TODO: allow filtering by group, role, etc
Set<String> seenNodes = new HashSet<>();
List<String> replicas = new ArrayList<>();
String joinedInputCollections = StrUtils.join(inputCollections, ',');
List<Replica> sortedReplicas = new ArrayList<>();
List<Replica> replicas = new ArrayList<>();
for (Slice slice : slices.values()) {
for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
String node = coreNodeProps.getNodeName();
Replica leader = slice.getLeader();
for (Replica replica : slice.getReplicas()) {
String node = replica.getNodeName();
if (!liveNodes.contains(node) // Must be a live node to continue
|| Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
|| replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
continue;
if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
if (sendToLeaders && coreNodeProps.isLeader()) {
theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
} else {
replicas.add(url); // replicas here
}
if (sendToLeaders && replica.equals(leader)) {
sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
} else {
replicas.add(replica); // replicas here
}
}
}

// Shuffle the leaders, if any (none if !sendToLeaders)
Collections.shuffle(theUrlList, rand);
// Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
replicaListTransformer.transform(sortedReplicas);

// Sort the replicas, if any, according to the request preferences and append to our list
replicaListTransformer.transform(replicas);

// Shuffle the replicas, if any, and append to our list
Collections.shuffle(replicas, rand);
theUrlList.addAll(replicas);
sortedReplicas.addAll(replicas);

String joinedInputCollections = StrUtils.join(inputCollections, ',');
Set<String> seenNodes = new HashSet<>();
sortedReplicas.forEach( replica -> {
if (seenNodes.add(replica.getNodeName())) {
theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
}
});

if (theUrlList.isEmpty()) {
collectionStateCache.keySet().removeAll(collectionNames);
Expand Down
Expand Up @@ -371,12 +371,13 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead
protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

for(String shardUrl : shardUrls) {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
Expand Down
Expand Up @@ -309,12 +309,12 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead

protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

String rows = mParams.get(ROWS);
int r = Integer.parseInt(rows);
int newRows = r/shardUrls.size();
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.params.SolrParams;

/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
Expand All @@ -46,6 +48,8 @@ public class StreamContext implements Serializable {
private ModelCache modelCache;
private StreamFactory streamFactory;
private boolean local;
private SolrParams requestParams;
private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;

public ConcurrentMap getObjectCache() {
return this.objectCache;
Expand Down Expand Up @@ -110,4 +114,20 @@ public void setLocal(boolean local) {
public boolean isLocal() {
return local;
}

public void setRequestParams(SolrParams requestParams) {
this.requestParams = requestParams;
}

public SolrParams getRequestParams() {
return requestParams;
}

public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
}

public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
return requestReplicaListTransformerGenerator;
}
}

0 comments on commit 1c78d2c

Please sign in to comment.