Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17066: Switch HttpSolrClient away from core URLs, pt 1 #2173

Merged
29 changes: 21 additions & 8 deletions solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ public final void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
}

/** Builds a new HttpSolrClient for use in recovery. Caller must close */
private final HttpSolrClient.Builder recoverySolrClientBuilder(final String leaderUrl) {
private HttpSolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
// workaround for SOLR-13605: get the configured timeouts & set them directly
// (even though getRecoveryOnlyHttpClient() already has them set)
final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
return (new HttpSolrClient.Builder(leaderUrl)
return (new HttpSolrClient.Builder(baseUrl)
.withDefaultCollection(leaderCoreName)
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withSocketTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()));
Expand Down Expand Up @@ -216,16 +217,26 @@ protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
return new ZkCoreNodeProps(leaderprops).getCoreUrl();
}

private final void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
protected String getReplicateLeaderBaseUrl(ZkNodeProps leaderProps) {
return new ZkCoreNodeProps(leaderProps).getBaseUrl();
}

protected String getReplicateLeaderCoreName(ZkNodeProps leaderProps) {
return new ZkCoreNodeProps(leaderProps).getCoreName();
}

private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
throws SolrServerException, IOException {

final String leaderBaseUrl = getReplicateLeaderBaseUrl(leaderprops);
final String leaderCore = getReplicateLeaderCoreName(leaderprops);
final String leaderUrl = getReplicateLeaderUrl(leaderprops);

log.info("Attempting to replicate from [{}].", leaderUrl);
gerlowskija marked this conversation as resolved.
Show resolved Hide resolved

// send commit if replica could be a leader
if (replicaType.leaderEligible) {
commitOnLeader(leaderUrl);
commitOnLeader(leaderBaseUrl, leaderCore);
}

// use rep handler directly, so we can do this sync rather than async
Expand Down Expand Up @@ -281,8 +292,9 @@ private final void replicate(String nodeName, SolrCore core, ZkNodeProps leaderp
}
}

private final void commitOnLeader(String leaderUrl) throws SolrServerException, IOException {
try (SolrClient client = recoverySolrClientBuilder(leaderUrl).build()) {
private void commitOnLeader(String leaderBaseUrl, String coreName)
throws SolrServerException, IOException {
try (SolrClient client = recoverySolrClientBuilder(leaderBaseUrl, coreName).build()) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
// ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
Expand Down Expand Up @@ -801,7 +813,8 @@ private final Replica pingLeader(
}

try (SolrClient httpSolrClient =
recoverySolrClientBuilder(leaderReplica.getCoreUrl()).build()) {
recoverySolrClientBuilder(leaderReplica.getBaseUrl(), leaderReplica.getCoreName())
.build()) {
httpSolrClient.ping();
return leaderReplica;
} catch (IOException e) {
Expand Down Expand Up @@ -906,7 +919,7 @@ private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreNa
conflictWaitMs
+ Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
try (HttpSolrClient client =
recoverySolrClientBuilder(leaderBaseUrl)
recoverySolrClientBuilder(leaderBaseUrl, leaderCoreName)
.withSocketTimeout(readTimeout, TimeUnit.MILLISECONDS)
.build()) {
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,13 @@ static void checkResults(String label, NamedList<Object> results, boolean failur

static void commit(NamedList<Object> results, String slice, Replica parentShardLeader) {
log.debug("Calling soft commit to make sub shard updates visible");
final var zkCoreProps = new ZkCoreNodeProps(parentShardLeader);
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
// and we force open a searcher so that we have documents to show upon switching states
UpdateResponse updateResponse = null;
try {
updateResponse = softCommit(coreUrl);
updateResponse = softCommit(zkCoreProps.getBaseUrl(), zkCoreProps.getCoreName());
CollectionHandlingUtils.processResponse(
results, null, coreUrl, updateResponse, slice, Collections.emptySet());
} catch (Exception e) {
Expand All @@ -254,10 +255,12 @@ static void commit(NamedList<Object> results, String slice, Replica parentShardL
}
}

static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
static UpdateResponse softCommit(String baseUrl, String coreName)
throws SolrServerException, IOException {

try (SolrClient client =
new HttpSolrClient.Builder(url)
new HttpSolrClient.Builder(baseUrl)
.withDefaultCollection(coreName)
.withConnectionTimeout(30000, TimeUnit.MILLISECONDS)
.withSocketTimeout(120000, TimeUnit.MILLISECONDS)
.build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
}
String chkCollection = CHK_COL_PREFIX + extCollection;
String daemonUrl = null;
Replica daemonReplica = null;
Exception exc = null;
boolean createdTarget = false;
try {
Expand Down Expand Up @@ -458,8 +459,8 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
"Unable to copy documents from " + collection + " to " + targetCollection,
e);
}
daemonUrl = getDaemonUrl(rsp, coll);
if (daemonUrl == null) {
daemonReplica = getReplicaForDaemon(rsp, coll);
if (daemonReplica == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Unable to copy documents from "
Expand All @@ -469,13 +470,13 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
+ ": "
+ Utils.toJSONString(rsp));
}
reindexingState.put("daemonUrl", daemonUrl);
reindexingState.put("daemonUrl", daemonReplica.getCoreUrl());
reindexingState.put("daemonName", targetCollection);
reindexingState.put(PHASE, "copying documents");
setReindexingState(collection, State.RUNNING, reindexingState);

// wait for the daemon to finish
waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
waitForDaemon(targetCollection, daemonReplica, collection, targetCollection, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
Expand Down Expand Up @@ -587,7 +588,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Objec
collection,
targetCollection,
chkCollection,
daemonUrl,
daemonReplica,
targetCollection,
createdTarget);
if (exc != null) {
Expand Down Expand Up @@ -667,7 +668,7 @@ private boolean maybeAbort(String collection) throws Exception {
}

// XXX see #waitForDaemon() for why we need this
private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
private Replica getReplicaForDaemon(SolrResponse rsp, DocCollection coll) {
@SuppressWarnings({"unchecked"})
Map<String, Object> rs = (Map<String, Object>) rsp.getResponse().get("result-set");
if (rs == null || rs.isEmpty()) {
Expand Down Expand Up @@ -711,7 +712,7 @@ private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
// build a baseUrl of the replica
for (Replica r : coll.getReplicas()) {
if (replicaName.equals(r.getCoreName())) {
return r.getBaseUrl() + "/" + r.getCoreName();
gerlowskija marked this conversation as resolved.
Show resolved Hide resolved
return r;
}
}
return null;
Expand All @@ -723,14 +724,17 @@ private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
@SuppressWarnings({"unchecked"})
private void waitForDaemon(
String daemonName,
String daemonUrl,
Replica daemonReplica,
String sourceCollection,
String targetCollection,
Map<String, Object> reindexingState)
throws Exception {
HttpClient client = ccc.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (SolrClient solrClient =
new HttpSolrClient.Builder().withHttpClient(client).withBaseSolrUrl(daemonUrl).build()) {
new HttpSolrClient.Builder()
.withHttpClient(client)
.withBaseSolrUrl(daemonReplica.getBaseUrl())
.build()) {
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
q.set("action", "list");
Expand All @@ -742,7 +746,7 @@ private void waitForDaemon(
isRunning = false;
statusCheck++;
try {
NamedList<Object> rsp = solrClient.request(req);
NamedList<Object> rsp = solrClient.request(req, daemonReplica.getCoreName());
Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
if (rs == null || rs.isEmpty()) {
throw new SolrException(
Expand Down Expand Up @@ -771,7 +775,7 @@ private void waitForDaemon(
} catch (Exception e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Exception waiting for daemon " + daemonName + " at " + daemonUrl,
"Exception waiting for daemon " + daemonName + " at " + daemonReplica.getCoreUrl(),
e);
}
if (statusCheck % 5 == 0) {
Expand All @@ -784,11 +788,17 @@ private void waitForDaemon(
}

@SuppressWarnings({"unchecked"})
private void killDaemon(String daemonName, String daemonUrl) throws Exception {
log.debug("-- killing daemon {} at {}", daemonName, daemonUrl);
private void killDaemon(String daemonName, Replica daemonReplica) throws Exception {
if (log.isDebugEnabled()) {
log.debug("-- killing daemon {} at {}", daemonName, daemonReplica.getCoreUrl());
}
HttpClient client = ccc.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (SolrClient solrClient =
new HttpSolrClient.Builder().withHttpClient(client).withBaseSolrUrl(daemonUrl).build()) {
new HttpSolrClient.Builder()
.withHttpClient(client)
.withDefaultCollection(daemonReplica.getCoreName())
.withBaseSolrUrl(daemonReplica.getBaseUrl())
.build()) {
ModifiableSolrParams q = new ModifiableSolrParams();
q.set(CommonParams.QT, "/stream");
// we should really use 'kill' here, but then we will never
Expand Down Expand Up @@ -889,7 +899,7 @@ private void cleanup(
String collection,
String targetCollection,
String chkCollection,
String daemonUrl,
Replica daemonReplica,
String daemonName,
boolean createdTarget)
throws Exception {
Expand All @@ -898,8 +908,8 @@ private void cleanup(
// 2. cleanup target / chk collections IFF the source collection still exists and is not empty
// 3. cleanup collection state

if (daemonUrl != null) {
killDaemon(daemonName, daemonUrl);
if (daemonReplica != null) {
killDaemon(daemonName, daemonReplica);
}
ClusterState clusterState = ccc.getSolrCloudManager().getClusterState();
NamedList<Object> cmdResults = new NamedList<>();
Expand Down