Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17066: Switch ConcurrentSolrClients away from core URLs #2254

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
protected SolrCmdDistributor.Req req;
protected List<SolrError> errors;

/**
* @param baseSolrUrl the base URL of a Solr node. Should <em>not</em> contain a collection or
* core name
* @param client
* @param req
* @param errors
*/
public Builder(
String baseSolrUrl,
Http2SolrClient client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private SolrClient buildClient(CloseableHttpClient httpClient, URL url) {
switch (random().nextInt(3)) {
case 0:
// currently, only testing with 1 thread
return new ConcurrentUpdateSolrClient.Builder(url.toString() + "/" + COLLECTION)
return new ConcurrentUpdateSolrClient.Builder(url.toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh... if only a builder new what a url was and we didn't .tostring everywhere. ;-) #beatdeadhorse

.withDefaultCollection(COLLECTION)
.withHttpClient(httpClient)
.withQueueSize(6)
.withThreadCount(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,8 @@ public void testConcurrentIndexing() throws Exception {
final int numDocs = atLeast(50);
final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
try (ConcurrentUpdateSolrClient indexClient =
new ConcurrentUpdateSolrClient.Builder(
nodeToUpdate.getProxyBaseUrl() + "/" + collectionName)
new ConcurrentUpdateSolrClient.Builder(nodeToUpdate.getProxyBaseUrl().toString())
.withDefaultCollection(collectionName)
.withQueueSize(10)
.withThreadCount(2)
.build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ public void setQueryParams(Set<String> queryParams) {

/**
* Determines whether this request should use or ignore any specified collections (esp. {@link
* SolrClient#defaultCollection})
* SolrClient#getDefaultCollection})
*
* <p>Many Solr requests target a particular core or collection. But not all of them - many Solr
* APIs (e.g. security or other admin APIs) are agnostic of collections entirely. This method
* gives these requests a way to opt out of using {@link SolrClient#defaultCollection} or other
* gives these requests a way to opt out of using {@link SolrClient#getDefaultCollection} or other
* specified collections.
*/
public boolean requiresCollection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ public NamedList<Object> request(SolrRequest<?> request, String collection)
if (requestCollection != null) {
collection = requestCollection;
} else if (collection == null) {
collection = defaultCollection;
collection = getDefaultCollection();
}

List<String> inputCollections =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private void addRunner() {
public NamedList<Object> request(final SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
if (!(request instanceof UpdateRequest)) {
request.setBasePath(basePath);
return client.request(request, collection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void setCollection(String collection) {
public NamedList<Object> request(final SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
if (!(request instanceof UpdateRequest)) {
return client.request(request, collection);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;

import java.io.IOException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.util.NamedList;

/**
* A {@link SolrClient} implementation that defers to a delegate SolrClient object.
*
* <p>Callers are responsible for closing the 'delegate' client themselves. {@link
* DelegatingSolrClient#close()} is a no-op
*/
public class DelegatingSolrClient extends SolrClient {

private final SolrClient delegate;

public DelegatingSolrClient(SolrClient delegate) {
this.delegate = delegate;
}

@Override
public NamedList<Object> request(SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request)) {
collection = getDefaultCollection();
}
return delegate.request(request, collection);
}

@Override
public void close() throws IOException {
/* No-op */
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public void onFailure(Response response, Throwable failure) {
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest))
collection = defaultCollection;
collection = getDefaultCollection();
String url = getRequestPath(solrRequest, collection);
Throwable abortCause = null;
Request req = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public NamedList<Object> request(
final SolrRequest<?> request, final ResponseParser processor, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
HttpRequestBase method = createMethod(request, collection);
setBasicAuthHeader(request, method);
if (request.getHeaders() != null) {
Expand Down Expand Up @@ -297,7 +297,7 @@ public HttpUriRequestResponse httpUriRequest(
final SolrRequest<?> request, final ResponseParser processor)
throws SolrServerException, IOException {
HttpUriRequestResponse mrr = new HttpUriRequestResponse();
final HttpRequestBase method = createMethod(request, defaultCollection);
final HttpRequestBase method = createMethod(request, getDefaultCollection());
ExecutorService pool =
ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("httpUriRequest"));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.URLUtil;
import org.slf4j.MDC;

/**
Expand Down Expand Up @@ -94,7 +95,25 @@ private LBHttp2SolrClient(Builder builder) {

@Override
protected SolrClient getClient(String baseUrl) {
return solrClient;
final String effectiveDefaultCollection = getEffectiveDefaultCollection(baseUrl);
return new DelegatingSolrClient(solrClient) {
@Override
public String getDefaultCollection() {
return effectiveDefaultCollection;
}
};
}

private String getEffectiveDefaultCollection(String baseUrl) {
if (getDefaultCollection() != null) {
return getDefaultCollection();
} else if (!URLUtil.isBaseUrl(baseUrl)) {
return URLUtil.extractCoreFromCoreUrl(baseUrl);
} else if (solrClient.getDefaultCollection() != null) {
return solrClient.getDefaultCollection();
} else {
return null;
}
}

@Override
Expand Down Expand Up @@ -193,64 +212,65 @@ private Cancellable doRequest(
RetryListener listener) {
rsp.server = baseUrl;
req.getRequest().setBasePath(baseUrl);
return ((Http2SolrClient) getClient(baseUrl))
.asyncRequest(
req.getRequest(),
null,
new AsyncListener<>() {
@Override
public void onSuccess(NamedList<Object> result) {
rsp.rsp = result;
final String effectiveDefaultCollection =
req.getRequest().requiresCollection() ? getEffectiveDefaultCollection(baseUrl) : null;
return solrClient.asyncRequest(
req.getRequest(),
effectiveDefaultCollection,
new AsyncListener<>() {
@Override
public void onSuccess(NamedList<Object> result) {
rsp.rsp = result;
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onSuccess(rsp);
}

@Override
public void onFailure(Throwable oe) {
try {
throw (Exception) oe;
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
listener.onFailure(e, false);
} catch (SolrException e) {
// we retry on 404 or 403 or 503 or 500
// unless it's an update - then we only retry on connect exception
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
// Server is alive but the request was likely malformed or invalid
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onSuccess(rsp);
listener.onFailure(e, false);
}

@Override
public void onFailure(Throwable oe) {
try {
throw (Exception) oe;
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
listener.onFailure(e, false);
} catch (SolrException e) {
// we retry on 404 or 403 or 503 or 500
// unless it's an update - then we only retry on connect exception
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
// Server is alive but the request was likely malformed or invalid
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onFailure(e, false);
}
} catch (SocketException e) {
if (!isNonRetryable || e instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SocketTimeoutException e) {
if (!isNonRetryable) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (!isNonRetryable && rootCause instanceof IOException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else if (isNonRetryable && rootCause instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (Exception e) {
listener.onFailure(new SolrServerException(e), false);
}
} catch (SocketException e) {
if (!isNonRetryable || e instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SocketTimeoutException e) {
if (!isNonRetryable) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (!isNonRetryable && rootCause instanceof IOException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else if (isNonRetryable && rootCause instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
});
} catch (Exception e) {
listener.onFailure(new SolrServerException(e), false);
}
}
});
}

public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ protected LBHttpSolrClient(Builder builder) {
? constructClient(builder.baseSolrUrls.toArray(new String[0]))
: builder.httpClient;
this.defaultCollection = builder.defaultCollection;
if (httpSolrClientBuilder != null && this.defaultCollection != null) {
httpSolrClientBuilder.defaultCollection = this.defaultCollection;
if (httpSolrClientBuilder != null && getDefaultCollection() != null) {
httpSolrClientBuilder.defaultCollection = getDefaultCollection();
}
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
this.soTimeoutMillis = builder.socketTimeoutMillis;
Expand Down Expand Up @@ -127,8 +127,8 @@ protected HttpSolrClient makeSolrClient(String server) {
.withHttpClient(httpClient)
.withConnectionTimeout(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS);
if (defaultCollection != null) {
httpSolrClientBuilder.withDefaultCollection(defaultCollection);
if (getDefaultCollection() != null) {
httpSolrClientBuilder.withDefaultCollection(getDefaultCollection());
}
if (requestWriter != null) {
httpSolrClientBuilder.withRequestWriter(requestWriter);
Expand All @@ -149,11 +149,8 @@ protected HttpSolrClient makeSolrClient(String server) {
.withResponseParser(parser)
.withConnectionTimeout(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS);

// Note that this may override the value extracted from the base URL..should we log a warning
// here?
if (defaultCollection != null) {
clientBuilder.withDefaultCollection(defaultCollection);
if (getDefaultCollection() != null) {
clientBuilder.withDefaultCollection(getDefaultCollection());
}
if (requestWriter != null) {
clientBuilder.withRequestWriter(requestWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ private void checkAZombieServer(ServerWrapper zombieServer) {
try {
QueryRequest queryRequest = new QueryRequest(solrQuery);
queryRequest.setBasePath(zombieServer.baseUrl);
QueryResponse resp = queryRequest.process(getClient(zombieServer.getBaseUrl()));

final SolrClient client = getClient(zombieServer.getBaseUrl());
QueryResponse resp = queryRequest.process(client);
if (resp.getStatus() == 0) {
// server has come back up.
// make sure to remove from zombies before adding to alive to avoid a race condition
Expand Down Expand Up @@ -576,7 +578,7 @@ public NamedList<Object> request(
int numServersTried = 0;
Map<String, ServerWrapper> justFailed = null;
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();

boolean timeAllowedExceeded = false;
long timeAllowedNano = getTimeAllowedInNanos(request);
Expand Down Expand Up @@ -621,6 +623,7 @@ public NamedList<Object> request(
try {
++numServersTried;
request.setBasePath(wrapper.baseUrl);

NamedList<Object> rsp = getClient(wrapper.baseUrl).request(request, collection);
// remove from zombie list *before* adding to alive to avoid a race that could lose a server
zombieServers.remove(wrapper.getBaseUrl());
Expand Down