Skip to content

Commit

Permalink
Fix LBHttp2SolrClient zombie testing
Browse files Browse the repository at this point in the history
Zombie-resurrection was broken in LBHttp2SolrClient because the default
collection wasn't being taken into account on these requests.  The root
of the problem is the way that LBHttp2 takes in an already-constructed
SolrClient, making it difficult to set the default collection.  (In
contrast, LBHttpSolrClient creates a different client for each base URL
using a common Builder - this allows it to property set the default
collection.)

This commit fixes this by created a 'DelegatingSolrClient' that LBHttp2
can use internally around its single Http2SolrClient.  The default
collection is set on this 'Delegating' wrapper, so that it gets picked
up by the zombie-resurrection logic.
  • Loading branch information
gerlowskija committed Feb 9, 2024
1 parent 410676b commit e70b2e2
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 71 deletions.
Expand Up @@ -177,11 +177,11 @@ public void setQueryParams(Set<String> queryParams) {

/**
* Determines whether this request should use or ignore any specified collections (esp. {@link
* SolrClient#defaultCollection})
* SolrClient#getDefaultCollection})
*
* <p>Many Solr requests target a particular core or collection. But not all of them - many Solr
* APIs (e.g. security or other admin APIs) are agnostic of collections entirely. This method
* gives these requests a way to opt out of using {@link SolrClient#defaultCollection} or other
* gives these requests a way to opt out of using {@link SolrClient#getDefaultCollection} or other
* specified collections.
*/
public boolean requiresCollection() {
Expand Down
Expand Up @@ -752,7 +752,7 @@ public NamedList<Object> request(SolrRequest<?> request, String collection)
if (requestCollection != null) {
collection = requestCollection;
} else if (collection == null) {
collection = defaultCollection;
collection = getDefaultCollection();
}

List<String> inputCollections =
Expand Down
Expand Up @@ -362,7 +362,7 @@ private void addRunner() {
public NamedList<Object> request(final SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
if (!(request instanceof UpdateRequest)) {
request.setBasePath(basePath);
return client.request(request, collection);
Expand Down
Expand Up @@ -487,7 +487,7 @@ public void setCollection(String collection) {
public NamedList<Object> request(final SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
if (!(request instanceof UpdateRequest)) {
return client.request(request, collection);
}
Expand Down
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;

import java.io.IOException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.util.NamedList;

/**
* A {@link SolrClient} implementation that defers to a delegate SolrClient object.
*
* <p>Callers are responsible for closing the 'delegate' client themselves. {@link
* DelegatingSolrClient#close()} is a no-op
*/
public class DelegatingSolrClient extends SolrClient {

private final SolrClient delegate;

public DelegatingSolrClient(SolrClient delegate) {
this.delegate = delegate;
}

@Override
public NamedList<Object> request(SolrRequest<?> request, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request)) {
collection = getDefaultCollection();
}
return delegate.request(request, collection);
}

@Override
public void close() throws IOException {
/* No-op */
}
}
Expand Up @@ -556,7 +556,7 @@ public void onFailure(Response response, Throwable failure) {
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest))
collection = defaultCollection;
collection = getDefaultCollection();
String url = getRequestPath(solrRequest, collection);
Throwable abortCause = null;
Request req = null;
Expand Down
Expand Up @@ -245,7 +245,7 @@ public NamedList<Object> request(
final SolrRequest<?> request, final ResponseParser processor, String collection)
throws SolrServerException, IOException {
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();
HttpRequestBase method = createMethod(request, collection);
setBasicAuthHeader(request, method);
if (request.getHeaders() != null) {
Expand Down Expand Up @@ -297,7 +297,7 @@ public HttpUriRequestResponse httpUriRequest(
final SolrRequest<?> request, final ResponseParser processor)
throws SolrServerException, IOException {
HttpUriRequestResponse mrr = new HttpUriRequestResponse();
final HttpRequestBase method = createMethod(request, defaultCollection);
final HttpRequestBase method = createMethod(request, getDefaultCollection());
ExecutorService pool =
ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("httpUriRequest"));
try {
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.URLUtil;
import org.slf4j.MDC;

/**
Expand Down Expand Up @@ -94,7 +95,25 @@ private LBHttp2SolrClient(Builder builder) {

@Override
protected SolrClient getClient(String baseUrl) {
return solrClient;
final String effectiveDefaultCollection = getEffectiveDefaultCollection(baseUrl);
return new DelegatingSolrClient(solrClient) {
@Override
public String getDefaultCollection() {
return effectiveDefaultCollection;
}
};
}

private String getEffectiveDefaultCollection(String baseUrl) {
if (getDefaultCollection() != null) {
return getDefaultCollection();
} else if (!URLUtil.isBaseUrl(baseUrl)) {
return URLUtil.extractCoreFromCoreUrl(baseUrl);
} else if (solrClient.getDefaultCollection() != null) {
return solrClient.getDefaultCollection();
} else {
return null;
}
}

@Override
Expand Down Expand Up @@ -193,64 +212,65 @@ private Cancellable doRequest(
RetryListener listener) {
rsp.server = baseUrl;
req.getRequest().setBasePath(baseUrl);
return ((Http2SolrClient) getClient(baseUrl))
.asyncRequest(
req.getRequest(),
null,
new AsyncListener<>() {
@Override
public void onSuccess(NamedList<Object> result) {
rsp.rsp = result;
final String effectiveDefaultCollection =
req.getRequest().requiresCollection() ? getEffectiveDefaultCollection(baseUrl) : null;
return solrClient.asyncRequest(
req.getRequest(),
effectiveDefaultCollection,
new AsyncListener<>() {
@Override
public void onSuccess(NamedList<Object> result) {
rsp.rsp = result;
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onSuccess(rsp);
}

@Override
public void onFailure(Throwable oe) {
try {
throw (Exception) oe;
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
listener.onFailure(e, false);
} catch (SolrException e) {
// we retry on 404 or 403 or 503 or 500
// unless it's an update - then we only retry on connect exception
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
// Server is alive but the request was likely malformed or invalid
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onSuccess(rsp);
listener.onFailure(e, false);
}

@Override
public void onFailure(Throwable oe) {
try {
throw (Exception) oe;
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
listener.onFailure(e, false);
} catch (SolrException e) {
// we retry on 404 or 403 or 503 or 500
// unless it's an update - then we only retry on connect exception
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
// Server is alive but the request was likely malformed or invalid
if (isZombie) {
zombieServers.remove(baseUrl);
}
listener.onFailure(e, false);
}
} catch (SocketException e) {
if (!isNonRetryable || e instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SocketTimeoutException e) {
if (!isNonRetryable) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (!isNonRetryable && rootCause instanceof IOException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else if (isNonRetryable && rootCause instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (Exception e) {
listener.onFailure(new SolrServerException(e), false);
}
} catch (SocketException e) {
if (!isNonRetryable || e instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SocketTimeoutException e) {
if (!isNonRetryable) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (!isNonRetryable && rootCause instanceof IOException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else if (isNonRetryable && rootCause instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
} else {
listener.onFailure(e, false);
}
});
} catch (Exception e) {
listener.onFailure(new SolrServerException(e), false);
}
}
});
}

public static class Builder {
Expand Down
Expand Up @@ -95,8 +95,8 @@ protected LBHttpSolrClient(Builder builder) {
? constructClient(builder.baseSolrUrls.toArray(new String[0]))
: builder.httpClient;
this.defaultCollection = builder.defaultCollection;
if (httpSolrClientBuilder != null && this.defaultCollection != null) {
httpSolrClientBuilder.defaultCollection = this.defaultCollection;
if (httpSolrClientBuilder != null && getDefaultCollection() != null) {
httpSolrClientBuilder.defaultCollection = getDefaultCollection();
}
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
this.soTimeoutMillis = builder.socketTimeoutMillis;
Expand Down Expand Up @@ -127,8 +127,8 @@ protected HttpSolrClient makeSolrClient(String server) {
.withHttpClient(httpClient)
.withConnectionTimeout(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS);
if (defaultCollection != null) {
httpSolrClientBuilder.withDefaultCollection(defaultCollection);
if (getDefaultCollection() != null) {
httpSolrClientBuilder.withDefaultCollection(getDefaultCollection());
}
if (requestWriter != null) {
httpSolrClientBuilder.withRequestWriter(requestWriter);
Expand All @@ -149,8 +149,8 @@ protected HttpSolrClient makeSolrClient(String server) {
.withResponseParser(parser)
.withConnectionTimeout(connectionTimeoutMillis, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS);
if (defaultCollection != null) {
clientBuilder.withDefaultCollection(defaultCollection);
if (getDefaultCollection() != null) {
clientBuilder.withDefaultCollection(getDefaultCollection());
}
if (requestWriter != null) {
clientBuilder.withRequestWriter(requestWriter);
Expand Down
Expand Up @@ -485,7 +485,9 @@ private void checkAZombieServer(ServerWrapper zombieServer) {
try {
QueryRequest queryRequest = new QueryRequest(solrQuery);
queryRequest.setBasePath(zombieServer.baseUrl);
QueryResponse resp = queryRequest.process(getClient(zombieServer.getBaseUrl()));

final SolrClient client = getClient(zombieServer.getBaseUrl());
QueryResponse resp = queryRequest.process(client);
if (resp.getStatus() == 0) {
// server has come back up.
// make sure to remove from zombies before adding to alive to avoid a race condition
Expand Down Expand Up @@ -576,7 +578,7 @@ public NamedList<Object> request(
int numServersTried = 0;
Map<String, ServerWrapper> justFailed = null;
if (ClientUtils.shouldApplyDefaultCollection(collection, request))
collection = defaultCollection;
collection = getDefaultCollection();

boolean timeAllowedExceeded = false;
long timeAllowedNano = getTimeAllowedInNanos(request);
Expand Down Expand Up @@ -621,6 +623,7 @@ public NamedList<Object> request(
try {
++numServersTried;
request.setBasePath(wrapper.baseUrl);

NamedList<Object> rsp = getClient(wrapper.baseUrl).request(request, collection);
// remove from zombie list *before* adding to alive to avoid a race that could lose a server
zombieServers.remove(wrapper.getBaseUrl());
Expand Down
Expand Up @@ -158,7 +158,7 @@ public void testSimple() throws Exception {
// Start the killed server once again
solr[1].startJetty();
// Wait for the alive check to complete
Thread.sleep(1200 * 5);
Thread.sleep(1200);
names.clear();
for (String ignored : solrUrls) {
resp = client.query(solrQuery);
Expand Down

0 comments on commit e70b2e2

Please sign in to comment.