Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-12217: Support shards.preference for individual shard requests #984

Merged
merged 8 commits into from Dec 9, 2019
22 changes: 22 additions & 0 deletions solr/core/src/java/org/apache/solr/handler/StreamHandler.java
Expand Up @@ -42,7 +42,10 @@
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
Expand Down Expand Up @@ -160,10 +163,29 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
return;
}


final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
if (zkController != null) {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
zkController.getZkStateReader().getClusterProperties()
.getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
.toString(),
zkController.getNodeName(),
zkController.getBaseUrl(),
zkController.getSysPropsCacher()
);
} else {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
}

int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
boolean local = params.getBool("streamLocalOnly", false);
StreamContext context = new StreamContext();
context.setRequestParams(params);
context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;
Expand Down
Expand Up @@ -55,6 +55,8 @@
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
Expand Down Expand Up @@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {

private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
boolean parallelUpdates; //TODO final
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
Expand Down Expand Up @@ -221,6 +224,7 @@ protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates,
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
}

/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
Expand Down Expand Up @@ -467,6 +471,8 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
} else {
params = new ModifiableSolrParams();
}

if (collection == null) {
Expand All @@ -492,10 +498,12 @@ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String col
return null;
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);

//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
final Map<String,List<String>> urlMap = buildUrlMap(col);
final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
if (routes == null) {
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
Expand Down Expand Up @@ -616,7 +624,7 @@ protected RouteException getRouteException(SolrException.ErrorCode serverError,
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
}

private Map<String,List<String>> buildUrlMap(DocCollection col) {
private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
for (Slice slice : slices) {
Expand All @@ -639,8 +647,8 @@ private Map<String,List<String>> buildUrlMap(DocCollection col) {
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
String leaderUrl = zkProps.getCoreUrl();

if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
Expand All @@ -651,6 +659,13 @@ private Map<String,List<String>> buildUrlMap(DocCollection col) {
}
}
}

// Sort the non-leader replicas according to the request parameters
replicaListTransformer.transform(urls);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary for updates? are you trying to keep consistency only or do you have some use case in mind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for consistency. I guess it's mostly a no-op, since the leader is always going to be first anyways (if a leader exists).


// put the leaderUrl first.
urls.add(0, leaderUrl);

urlMap.put(name, urls);
}
return urlMap;
Expand Down Expand Up @@ -1046,6 +1061,8 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
reqParams = new ModifiableSolrParams();
}

ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);

final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();

final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
Expand Down Expand Up @@ -1087,7 +1104,6 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
}

// Gather URLs, grouped by leader or replica
// TODO: allow filtering by group, role, etc
Set<String> seenNodes = new HashSet<>();
List<String> replicas = new ArrayList<>();
String joinedInputCollections = StrUtils.join(inputCollections, ',');
Expand All @@ -1109,11 +1125,12 @@ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputC
}
}

// Shuffle the leaders, if any (none if !sendToLeaders)
// Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
replicaListTransformer.transform(replicas);
HoustonPutman marked this conversation as resolved.
Show resolved Hide resolved
Collections.shuffle(theUrlList, rand);

// Shuffle the replicas, if any, and append to our list
Collections.shuffle(replicas, rand);
// Sort the replicas, if any, according to the request preferences and append to our list
replicaListTransformer.transform(replicas);
theUrlList.addAll(replicas);

if (theUrlList.isEmpty()) {
Expand Down
Expand Up @@ -371,12 +371,13 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead
protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

for(String shardUrl : shardUrls) {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
Expand Down
Expand Up @@ -309,12 +309,12 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead

protected void constructStreams() throws IOException {
try {

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);

ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.

List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);

String rows = mParams.get(ROWS);
int r = Integer.parseInt(rows);
int newRows = r/shardUrls.size();
Expand Down
Expand Up @@ -241,7 +241,6 @@ public List<TupleStream> children() {
}

private List<Future<NamedList>> callShards(List<String> baseUrls) throws IOException {

List<Future<NamedList>> futures = new ArrayList<>();
for (String baseUrl : baseUrls) {
SignificantTermsCall lc = new SignificantTermsCall(baseUrl,
Expand Down Expand Up @@ -282,6 +281,7 @@ public Explanation toExplanation(StreamFactory factory) throws IOException {
}

public Tuple read() throws IOException {

HoustonPutman marked this conversation as resolved.
Show resolved Hide resolved
try {
if (tupleIterator == null) {
Map<String, int[]> mergeFreqs = new HashMap<>();
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.params.SolrParams;

/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
Expand All @@ -46,6 +48,8 @@ public class StreamContext implements Serializable {
private ModelCache modelCache;
private StreamFactory streamFactory;
private boolean local;
private SolrParams requestParams;
private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;

public ConcurrentMap getObjectCache() {
return this.objectCache;
Expand Down Expand Up @@ -110,4 +114,20 @@ public void setLocal(boolean local) {
public boolean isLocal() {
return local;
}

public void setRequestParams(SolrParams requestParams) {
this.requestParams = requestParams;
}

public SolrParams getRequestParams() {
return requestParams;
}

public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
}

public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
return requestReplicaListTransformerGenerator;
}
}
Expand Up @@ -22,9 +22,8 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Map;
Expand All @@ -34,13 +33,17 @@
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;


/**
Expand Down Expand Up @@ -118,6 +121,14 @@ public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext)
throws IOException {
return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
}

public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext,
SolrParams requestParams)
throws IOException {
Map<String, List<String>> shardsMap = null;
List<String> shards = new ArrayList();

Expand All @@ -135,19 +146,28 @@ public static List<String> getShards(String zkHost,
ClusterState clusterState = zkStateReader.getClusterState();
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();


ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
solrParams.add(requestParams);

RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);

ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(requestParams);

for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
List<String> replicaUrls = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
replicaUrls.add(zkProps.getCoreUrl());
}
}

Collections.shuffle(shuffler, new Random());
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
shards.add(url);
replicaListTransformer.transform(replicaUrls);
shards.add(replicaUrls.get(0));
}
}
Object core = streamContext.get("core");
Expand Down