Skip to content

Commit

Permalink
SOLR-12217: Support shards.preference for individual shard requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Oct 30, 2019
1 parent 22b6817 commit 7a49167
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 32 deletions.
22 changes: 22 additions & 0 deletions solr/core/src/java/org/apache/solr/handler/StreamHandler.java
Expand Up @@ -42,7 +42,10 @@
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
Expand Down Expand Up @@ -160,10 +163,29 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
return;
}


final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
if (zkController != null) {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
zkController.getZkStateReader().getClusterProperties()
.getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
.toString(),
zkController.getNodeName(),
zkController.getBaseUrl(),
zkController.getSysPropsCacher()
);
} else {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
}

int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
boolean local = params.getBool("streamLocalOnly", false);
StreamContext context = new StreamContext();
context.setRequestParams(params);
context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;
Expand Down
Expand Up @@ -55,6 +55,8 @@
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
Expand Down Expand Up @@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {

private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
boolean parallelUpdates; //TODO final
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
Expand Down Expand Up @@ -221,6 +224,7 @@ protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates,
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
}

/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
Expand Down Expand Up @@ -467,6 +471,8 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
} else {
params = new ModifiableSolrParams();
}

if (collection == null) {
Expand All @@ -492,10 +498,12 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
return null;
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);

//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
final Map<String,List<String>> urlMap = buildUrlMap(col);
final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
if (routes == null) {
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
Expand Down Expand Up @@ -616,7 +624,7 @@ protected RouteException getRouteException(SolrException.ErrorCode serverError,
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
}

private Map<String,List<String>> buildUrlMap(DocCollection col) {
private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
for (Slice slice : slices) {
Expand All @@ -639,8 +647,8 @@ private Map<String,List<String>> buildUrlMap(DocCollection col) {
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
String leaderUrl = zkProps.getCoreUrl();

if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
Expand All @@ -651,6 +659,13 @@ private Map<String,List<String>> buildUrlMap(DocCollection col) {
}
}
}

// Sort the non-leader replicas according to the request parameters
replicaListTransformer.transform(urls);

// put the leaderUrl first.
urls.add(0, leaderUrl);

urlMap.put(name, urls);
}
return urlMap;
Expand Down Expand Up @@ -1046,6 +1061,8 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
reqParams = new ModifiableSolrParams();
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);

final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();

final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
Expand Down Expand Up @@ -1087,7 +1104,6 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
}

// Gather URLs, grouped by leader or replica
// TODO: allow filtering by group, role, etc
Set<String> seenNodes = new HashSet<>();
List<String> replicas = new ArrayList<>();
String joinedInputCollections = StrUtils.join(inputCollections, ',');
Expand All @@ -1109,11 +1125,12 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
}
}

// Shuffle the leaders, if any (none if !sendToLeaders)
// Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
replicaListTransformer.transform(replicas);
Collections.shuffle(theUrlList, rand);

// Shuffle the replicas, if any, and append to our list
Collections.shuffle(replicas, rand);
// Sort the replicas, if any, according to the request preferences and append to our list
replicaListTransformer.transform(replicas);
theUrlList.addAll(replicas);

if (theUrlList.isEmpty()) {
Expand Down
Expand Up @@ -371,12 +371,13 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead
protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

for(String shardUrl : shardUrls) {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
Expand Down
Expand Up @@ -309,12 +309,12 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead

protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

String rows = mParams.get(ROWS);
int r = Integer.parseInt(rows);
int newRows = r/shardUrls.size();
Expand Down
Expand Up @@ -241,7 +241,6 @@ public List<TupleStream> children() {
}

private List<Future<NamedList>> callShards(List<String> baseUrls) throws IOException {

List<Future<NamedList>> futures = new ArrayList<>();
for (String baseUrl : baseUrls) {
SignificantTermsCall lc = new SignificantTermsCall(baseUrl,
Expand Down Expand Up @@ -282,6 +281,7 @@ public Explanation toExplanation(StreamFactory factory) throws IOException {
}

public Tuple read() throws IOException {

try {
if (tupleIterator == null) {
Map<String, int[]> mergeFreqs = new HashMap<>();
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.params.SolrParams;

/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
Expand All @@ -46,6 +48,8 @@ public class StreamContext implements Serializable {
private ModelCache modelCache;
private StreamFactory streamFactory;
private boolean local;
private SolrParams requestParams;
private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;

public ConcurrentMap getObjectCache() {
return this.objectCache;
Expand Down Expand Up @@ -110,4 +114,20 @@ public void setLocal(boolean local) {
public boolean isLocal() {
return local;
}

public void setRequestParams(SolrParams requestParams) {
this.requestParams = requestParams;
}

public SolrParams getRequestParams() {
return requestParams;
}

public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
}

public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
return requestReplicaListTransformerGenerator;
}
}
Expand Up @@ -22,9 +22,8 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Map;
Expand All @@ -34,13 +33,17 @@
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;


/**
Expand Down Expand Up @@ -118,6 +121,14 @@ public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext)
throws IOException {
return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
}

public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext,
SolrParams requestParams)
throws IOException {
Map<String, List<String>> shardsMap = null;
List<String> shards = new ArrayList();

Expand All @@ -135,19 +146,28 @@ public static List<String> getShards(String zkHost,
ClusterState clusterState = zkStateReader.getClusterState();
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();


ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
solrParams.add(requestParams);

RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);

ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(requestParams);

for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
List<String> replicaUrls = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
replicaUrls.add(zkProps.getCoreUrl());
}
}

Collections.shuffle(shuffler, new Random());
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
shards.add(url);
replicaListTransformer.transform(replicaUrls);
shards.add(replicaUrls.get(0));
}
}
Object core = streamContext.get("core");
Expand Down

0 comments on commit 7a49167

Please sign in to comment.