From 2088d743db9304492ab19c14bc80c6187f172b79 Mon Sep 17 00:00:00 2001 From: Ishan Chattopadhyaya Date: Tue, 24 Oct 2023 00:49:00 +0530 Subject: [PATCH] SOLR-15694, SOLR-15715: Node roles and dedicated query coordinator nodes Co-authored-by: Noble Paul --- .../handler/component/AnalyticsComponent.java | 2 +- .../LTRFeatureLoggerTransformerFactory.java | 2 +- .../solr/ltr/search/LTRQParserPlugin.java | 2 +- .../solr/api/CoordinatorV2HttpSolrCall.java | 58 ++ .../solr/cloud/api/collections/Assign.java | 29 +- .../api/collections/CategoryRoutedAlias.java | 2 +- .../api/collections/CreateCollectionCmd.java | 7 +- .../cloud/api/collections/RestoreCmd.java | 8 +- .../cloud/api/collections/RoutedAlias.java | 2 +- .../api/collections/TimeRoutedAlias.java | 2 +- .../org/apache/solr/core/CoreContainer.java | 1 + .../java/org/apache/solr/core/NodeRoles.java | 152 +++ .../org/apache/solr/handler/ClusterAPI.java | 129 +++ .../solr/handler/RequestHandlerBase.java | 4 +- .../solr/handler/SolrConfigHandler.java | 10 +- .../handler/admin/ShowFileRequestHandler.java | 4 +- .../handler/component/HttpShardHandler.java | 4 +- .../component/HttpShardHandlerFactory.java | 2 +- .../handler/component/QueryComponent.java | 2 +- .../component/RealTimeGetComponent.java | 10 +- .../solr/handler/component/SearchHandler.java | 4 +- .../request/DelegatingSolrQueryRequest.java | 162 ++++ .../org/apache/solr/request/SimpleFacets.java | 2 +- .../apache/solr/request/SolrQueryRequest.java | 14 +- .../transform/CoreAugmenterFactory.java | 29 + .../transform/TransformerFactory.java | 2 + .../apache/solr/search/JoinQParserPlugin.java | 2 +- .../search/join/ScoreJoinQParserPlugin.java | 4 +- .../solr/servlet/CoordinatorHttpSolrCall.java | 352 +++++++ .../org/apache/solr/servlet/HttpSolrCall.java | 9 + .../solr/servlet/SolrDispatchFilter.java | 35 +- .../org/apache/solr/update/UpdateLog.java | 2 +- .../DistributedUpdateProcessorFactory.java | 2 +- .../processor/RoutedAliasUpdateProcessor.java | 10 +- .../configsets/cache-control/conf/schema.xml | 27 + .../cache-control/conf/solrconfig.xml | 54 ++ .../solr/configsets/conf3/conf/schema.xml | 43 + .../solr/configsets/conf3/conf/solrconfig.xml | 68 ++ .../common/cloud/ZkStateReaderAccessor.java | 5 +- .../solr/search/TestCoordinatorRole.java | 888 ++++++++++++++++++ .../apache/solr/client/solrj/SolrRequest.java | 13 +- .../solrj/impl/BaseCloudSolrClient.java | 15 + .../solr/common/cloud/ZkStateReader.java | 10 + .../java/org/apache/solr/SolrTestCaseJ4.java | 14 + 44 files changed, 2147 insertions(+), 51 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java create mode 100644 solr/core/src/java/org/apache/solr/core/NodeRoles.java create mode 100644 solr/core/src/java/org/apache/solr/request/DelegatingSolrQueryRequest.java create mode 100644 solr/core/src/java/org/apache/solr/response/transform/CoreAugmenterFactory.java create mode 100644 solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java create mode 100644 solr/core/src/test-files/solr/configsets/cache-control/conf/schema.xml create mode 100644 solr/core/src/test-files/solr/configsets/cache-control/conf/solrconfig.xml create mode 100644 solr/core/src/test-files/solr/configsets/conf3/conf/schema.xml create mode 100644 solr/core/src/test-files/solr/configsets/conf3/conf/solrconfig.xml create mode 100644 solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java diff --git a/solr/contrib/analytics/src/java/org/apache/solr/handler/component/AnalyticsComponent.java b/solr/contrib/analytics/src/java/org/apache/solr/handler/component/AnalyticsComponent.java index 5248181b76bd..4017b6b05715 100644 --- a/solr/contrib/analytics/src/java/org/apache/solr/handler/component/AnalyticsComponent.java +++ b/solr/contrib/analytics/src/java/org/apache/solr/handler/component/AnalyticsComponent.java @@ -109,7 +109,7 @@ public int distributedProcess(ResponseBuilder rb) throws IOException { // Send out a request to each shard and merge the responses into our AnalyticsRequestManager reqManager.shardStream.sendRequests(rb.req.getCore().getCoreDescriptor().getCollectionName(), - rb.req.getCore().getCoreContainer().getZkController().getZkServerAddress()); + rb.req.getCoreContainer().getZkController().getZkServerAddress()); reqManager.sendShards = false; diff --git a/solr/contrib/ltr/src/java/org/apache/solr/ltr/response/transform/LTRFeatureLoggerTransformerFactory.java b/solr/contrib/ltr/src/java/org/apache/solr/ltr/response/transform/LTRFeatureLoggerTransformerFactory.java index 96bd795ef3a8..8f2aaa5796c0 100644 --- a/solr/contrib/ltr/src/java/org/apache/solr/ltr/response/transform/LTRFeatureLoggerTransformerFactory.java +++ b/solr/contrib/ltr/src/java/org/apache/solr/ltr/response/transform/LTRFeatureLoggerTransformerFactory.java @@ -217,7 +217,7 @@ public void setContext(ResultContext context) { } leafContexts = searcher.getTopReaderContext().leaves(); if (threadManager != null) { - threadManager.setExecutor(context.getRequest().getCore().getCoreContainer().getUpdateShardHandler().getUpdateExecutor()); + threadManager.setExecutor(context.getRequest().getCoreContainer().getUpdateShardHandler().getUpdateExecutor()); } rerankingQueriesFromContext = SolrQueryRequestContextUtils.getScoringQueries(req); diff --git a/solr/contrib/ltr/src/java/org/apache/solr/ltr/search/LTRQParserPlugin.java b/solr/contrib/ltr/src/java/org/apache/solr/ltr/search/LTRQParserPlugin.java index 0bc0b678c2b6..39e4828649de 100644 --- a/solr/contrib/ltr/src/java/org/apache/solr/ltr/search/LTRQParserPlugin.java +++ b/solr/contrib/ltr/src/java/org/apache/solr/ltr/search/LTRQParserPlugin.java @@ -153,7 +153,7 @@ public LTRQParser(String qstr, SolrParams localParams, SolrParams params, @Override public Query parse() throws SyntaxError { if (threadManager != null) { - threadManager.setExecutor(req.getCore().getCoreContainer().getUpdateShardHandler().getUpdateExecutor()); + threadManager.setExecutor(req.getCoreContainer().getUpdateShardHandler().getUpdateExecutor()); } // ReRanking Model final String[] modelNames = localParams.getParams(LTRQParserPlugin.MODEL); diff --git a/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java b/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java new file mode 100644 index 000000000000..f29d2407022d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.api; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.servlet.CoordinatorHttpSolrCall; +import org.apache.solr.servlet.SolrDispatchFilter; + +public class CoordinatorV2HttpSolrCall extends V2HttpCall { + private String collectionName; + CoordinatorHttpSolrCall.Factory factory; + + public CoordinatorV2HttpSolrCall( + CoordinatorHttpSolrCall.Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cc, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cc, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return CoordinatorHttpSolrCall.getCore(factory, this, collectionName, isPreferLeader); + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = CoordinatorHttpSolrCall.wrappedReq(solrReq, collectionName, this); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java index 138f5afc34e3..8728cd3fab73 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -55,6 +56,8 @@ import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; +import org.apache.solr.core.NodeRoles; +import org.apache.solr.handler.ClusterAPI; import org.apache.solr.util.NumberUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -233,7 +236,12 @@ private static boolean existCoreName(String coreName, Slice slice) { return false; } - public static List getLiveOrLiveAndCreateNodeSetList(final Set liveNodes, final ZkNodeProps message, final Random random) { + public static List getLiveOrLiveAndCreateNodeSetList( + final Set liveNodes, + final ZkNodeProps message, + final Random random, + DistribStateManager zk) { + List nodeList; final String createNodeSetStr = message.getStr(CREATE_NODE_SET); final List createNodeList = (createNodeSetStr == null) ? null : @@ -248,13 +256,28 @@ public static List getLiveOrLiveAndCreateNodeSetList(final Set l Collections.shuffle(nodeList, random); } } else { - nodeList = new ArrayList<>(liveNodes); + nodeList = new ArrayList<>(filterNonDataNodes(zk, liveNodes)); Collections.shuffle(nodeList, random); } return nodeList; } - + public static Collection filterNonDataNodes( + DistribStateManager zk, Collection liveNodes) { + try { + List noData = ClusterAPI.getNodesByRole(NodeRoles.Role.DATA, NodeRoles.MODE_OFF, zk); + if (noData.isEmpty()) { + return liveNodes; + } else { + liveNodes = new HashSet<>(liveNodes); + liveNodes.removeAll(noData); + return liveNodes; + } + } catch (Exception e) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, "Error fetching roles from Zookeeper", e); + } + } /** * Note: where possible, the {@link #usePolicyFramework(DocCollection, SolrCloudManager)} method should * be used instead of this method diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java index 0ba96c39716f..24a2b4451c07 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CategoryRoutedAlias.java @@ -123,7 +123,7 @@ public RoutedAliasTypes getRoutedAliasType() { @Override public void validateRouteValue(AddUpdateCommand cmd) throws SolrException { if (this.aliases == null) { - updateParsedCollectionAliases(cmd.getReq().getCore().getCoreContainer().getZkController().zkStateReader, false); + updateParsedCollectionAliases(cmd.getReq().getCoreContainer().getZkController().zkStateReader, false); } Object fieldValue = cmd.getSolrInputDocument().getFieldValue(getRouteField()); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index 79ebfed5720c..d9375ae46851 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -459,7 +459,12 @@ public static List buildReplicaPositions(SolrCloudManager cloud // but (for now) require that each core goes on a distinct node. List replicaPositions; - List nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM); + List nodeList = + Assign.getLiveOrLiveAndCreateNodeSetList( + clusterState.getLiveNodes(), + message, + OverseerCollectionMessageHandler.RANDOM, + cloudManager.getDistribStateManager()); if (nodeList.isEmpty()) { log.warn("It is unusual to create a collection ({}) without cores.", collectionName); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java index 987d0d741bef..df082278c73c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java @@ -189,8 +189,12 @@ private RestoreContext(ZkNodeProps message, OverseerCollectionMessageHandler ocm this.backupCollectionState = this.backupManager.readCollectionState(this.backupCollection); this.shardHandler = ocmh.shardHandlerFactory.getShardHandler(); - this.nodeList = Assign.getLiveOrLiveAndCreateNodeSetList( - zkStateReader.getClusterState().getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM); + this.nodeList = + Assign.getLiveOrLiveAndCreateNodeSetList( + zkStateReader.getClusterState().getLiveNodes(), + message, + OverseerCollectionMessageHandler.RANDOM, + container.getZkController().getSolrCloudManager().getDistribStateManager()); } @Override diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java index cb53e2341640..bd09dc37826d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RoutedAlias.java @@ -324,7 +324,7 @@ private String createAllRequiredCollections(AddUpdateCommand cmd, CandidateColle SolrQueryRequest req = cmd.getReq(); SolrCore core = req.getCore(); - CoreContainer coreContainer = core.getCoreContainer(); + CoreContainer coreContainer = req.getCoreContainer(); do { switch (targetCollectionDesc.getCreationType()) { case NONE: diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java index afe1dd7923a6..f8fa95c7bb00 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/TimeRoutedAlias.java @@ -373,7 +373,7 @@ public void validateRouteValue(AddUpdateCommand cmd) throws SolrException { } catch (DateTimeParseException e) { startTime = DateMathParser.parseMath(new Date(), start).toInstant(); SolrCore core = cmd.getReq().getCore(); - ZkStateReader zkStateReader = core.getCoreContainer().getZkController().zkStateReader; + ZkStateReader zkStateReader = cmd.getReq().getCoreContainer().getZkController().zkStateReader; Aliases aliases = zkStateReader.getAliases(); Map props = new HashMap<>(aliases.getCollectionAliasProperties(aliasName)); start = DateTimeFormatter.ISO_INSTANT.format(startTime); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 7d217b6dbb84..513d912fda0d 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -249,6 +249,7 @@ public CoreLoadFailure(CoreDescriptor cd, Exception loadFailure) { protected volatile AutoscalingHistoryHandler autoscalingHistoryHandler; private volatile SolrClientCache solrClientCache; + public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP)); private final ObjectCache objectCache = new ObjectCache(); diff --git a/solr/core/src/java/org/apache/solr/core/NodeRoles.java b/solr/core/src/java/org/apache/solr/core/NodeRoles.java new file mode 100644 index 000000000000..abab48b1db0c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/NodeRoles.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.StringUtils; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.StrUtils; + +public class NodeRoles { + public static final String NODE_ROLES_PROP = "solr.node.roles"; + + /** Roles to be assumed on nodes that don't have roles specified for them at startup */ + public static final String DEFAULT_ROLES_STRING = "data:on,overseer:allowed"; + + // Map of roles to mode that are applicable for this node. + private Map nodeRoles; + + public NodeRoles(String rolesString) { + Map roles = new EnumMap<>(Role.class); + if (StringUtils.isEmpty(rolesString)) { + rolesString = DEFAULT_ROLES_STRING; + } + List rolesList = StrUtils.splitSmart(rolesString, ','); + for (String s : rolesList) { + List roleMode = StrUtils.splitSmart(s, ':'); + Role r = Role.getRole(roleMode.get(0)); + String m = roleMode.get(1); + if (r.supportedModes().contains(m)) { + roles.put(r, m); + } else { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Unknown role mode '" + roleMode.get(1) + "' for role '" + r + "'"); + } + } + for (Role r : Role.values()) { + if (!roles.containsKey(r)) { + roles.put(r, r.modeWhenRoleIsAbsent()); + } + } + nodeRoles = Collections.unmodifiableMap(roles); + } + + public Map getRoles() { + return nodeRoles; + } + + public String getRoleMode(Role role) { + return nodeRoles.get(role); + } + + public boolean isOverseerAllowedOrPreferred() { + String roleMode = nodeRoles.get(Role.OVERSEER); + return MODE_ALLOWED.equals(roleMode) || MODE_PREFERRED.equals(roleMode); + } + + public static final String MODE_ON = "on"; + public static final String MODE_OFF = "off"; + public static final String MODE_ALLOWED = "allowed"; + public static final String MODE_PREFERRED = "preferred"; + public static final String MODE_DISALLOWED = "disallowed"; + + public enum Role { + DATA("data") { + @Override + public Set supportedModes() { + return ImmutableSet.of(MODE_ON, MODE_OFF); + } + + @Override + public String modeWhenRoleIsAbsent() { + return MODE_OFF; + } + }, + OVERSEER("overseer") { + @Override + public Set supportedModes() { + return ImmutableSet.of(MODE_ALLOWED, MODE_PREFERRED, MODE_DISALLOWED); + } + + @Override + public String modeWhenRoleIsAbsent() { + return MODE_DISALLOWED; + } + }, + + COORDINATOR("coordinator") { + @Override + public String modeWhenRoleIsAbsent() { + return MODE_OFF; + } + + @Override + public Set supportedModes() { + return ImmutableSet.of(MODE_ON, MODE_OFF); + } + }; + + public final String roleName; + + Role(String name) { + this.roleName = name; + } + + public static Role getRole(String value) { + for (Role role : Role.values()) { + if (value.equals(role.roleName)) return role; + } + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown role: " + value); + } + + public abstract Set supportedModes(); + + /** Default mode for a role in nodes where this role is not specified. */ + public abstract String modeWhenRoleIsAbsent(); + + @Override + public String toString() { + return roleName; + } + } + + public static String getZNodeForRole(Role role) { + return ZkStateReader.NODE_ROLES + "/" + role.roleName; + } + + public static String getZNodeForRoleMode(Role role, String mode) { + return ZkStateReader.NODE_ROLES + "/" + role.roleName + "/" + mode; + } +} diff --git a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java index a1a6d6f3ba52..02330e1ecdb0 100644 --- a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java @@ -17,18 +17,26 @@ package org.apache.solr.handler; +import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import com.google.common.collect.ImmutableMap; import org.apache.solr.api.Command; import org.apache.solr.api.EndPoint; import org.apache.solr.api.PayloadObj; +import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.request.beans.ClusterPropInfo; import org.apache.solr.client.solrj.request.beans.CreateConfigInfo; import org.apache.solr.cloud.OverseerConfigSetMessageHandler; import org.apache.solr.common.SolrException; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.cloud.ClusterProperties; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ConfigSetParams; import org.apache.solr.common.params.DefaultSolrParams; @@ -36,10 +44,12 @@ import org.apache.solr.common.util.ReflectMapWriter; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.NodeRoles; import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.admin.ConfigSetsHandler; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; +import org.apache.zookeeper.KeeperException; import static org.apache.solr.client.solrj.SolrRequest.METHOD.DELETE; import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET; @@ -69,7 +79,126 @@ public ClusterAPI(CollectionsHandler ch, ConfigSetsHandler configSetsHandler) { this.collectionsHandler = ch; this.configSetsHandler = configSetsHandler; } + @EndPoint(method = GET, path = "/cluster/node-roles", permission = COLL_READ_PERM) + public void roles(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + rsp.add( + "node-roles", + readRecursive( + ZkStateReader.NODE_ROLES, + collectionsHandler + .getCoreContainer() + .getZkController() + .getSolrCloudManager() + .getDistribStateManager(), + 3)); + } + + Object readRecursive(String path, DistribStateManager zk, int depth) { + if (depth == 0) return null; + Map result; + try { + List children = zk.listData(path); + if (children != null && !children.isEmpty()) { + result = new HashMap<>(); + } else { + return Collections.emptySet(); + } + for (String child : children) { + Object c = readRecursive(path + "/" + child, zk, depth - 1); + result.put(child, c); + } + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + if (depth == 1) { + return result.keySet(); + } else { + return result; + } + } + + @EndPoint(method = GET, path = "/cluster/node-roles/role/{role}", permission = COLL_READ_PERM) + public void nodesWithRole(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + String role = req.getPathTemplateValues().get("role"); + rsp.add( + "node-roles", + ImmutableMap.of( + role, + readRecursive( + ZkStateReader.NODE_ROLES + "/" + role, + collectionsHandler + .getCoreContainer() + .getZkController() + .getSolrCloudManager() + .getDistribStateManager(), + 2))); + } + + @EndPoint(method = GET, path = "/cluster/node-roles/node/{node}", permission = COLL_READ_PERM) + @SuppressWarnings("unchecked") + public void rolesForNode(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + String node = req.getPathTemplateValues().get("node"); + Map ret = new HashMap<>(); + Map>> roles = + (Map>>) + readRecursive( + ZkStateReader.NODE_ROLES, + collectionsHandler + .getCoreContainer() + .getZkController() + .getSolrCloudManager() + .getDistribStateManager(), + 3); + for (String role : roles.keySet()) { + for (String mode : roles.get(role).keySet()) { + if (roles.get(role).get(mode).isEmpty()) continue; + Set nodes = roles.get(role).get(mode); + if (nodes.contains(node)) ret.put(role, mode); + } + } + for (String role : ret.keySet()) { + rsp.add(role, ret.get(role)); + } + } + + @EndPoint(method = GET, path = "/cluster/node-roles/supported", permission = COLL_READ_PERM) + public void supportedRoles(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + Map roleModesSupportedMap = new HashMap<>(); + for (NodeRoles.Role role : NodeRoles.Role.values()) { + roleModesSupportedMap.put(role.toString(), ImmutableMap.of("modes", role.supportedModes())); + } + rsp.add("supported-roles", roleModesSupportedMap); + } + @EndPoint( + method = GET, + path = "/cluster/node-roles/role/{role}/{mode}", + permission = COLL_READ_PERM) + public void nodesWithRoleMode(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + // Here, deal with raw strings instead of Role & Mode types so as to handle roles and modes + // that are not understood by this node (possibly during a rolling upgrade) + String roleStr = req.getPathTemplateValues().get("role"); + String modeStr = req.getPathTemplateValues().get("mode"); + + List nodes = + collectionsHandler + .getCoreContainer() + .getZkController() + .getSolrCloudManager() + .getDistribStateManager() + .listData(ZkStateReader.NODE_ROLES + "/" + roleStr + "/" + modeStr); + rsp.add("node-roles", ImmutableMap.of(roleStr, Collections.singletonMap(modeStr, nodes))); + } + + public static List getNodesByRole( + NodeRoles.Role role, String mode, DistribStateManager zk) + throws InterruptedException, IOException, KeeperException { + try { + return zk.listData(ZkStateReader.NODE_ROLES + "/" + role + "/" + mode); + } catch (NoSuchElementException e) { + return Collections.emptyList(); + } + } @EndPoint(method = GET, path = "/cluster/aliases", permission = COLL_READ_PERM) diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java index 080a696acc9f..3bdcf01138c8 100644 --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java @@ -192,7 +192,7 @@ public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) { requests.inc(); // requests are distributed by default when ZK is in use, unless indicated otherwise boolean distrib = req.getParams().getBool(CommonParams.DISTRIB, - req.getCore() != null ? req.getCore().getCoreContainer().isZooKeeperAware() : false); + req.getCore() != null ? req.getCoreContainer().isZooKeeperAware() : false); if (req.getParams().getBool(ShardParams.IS_SHARD, false)) { shardPurposes.computeIfAbsent("total", name -> new Counter()).inc(); int purpose = req.getParams().getInt(ShardParams.SHARDS_PURPOSE, 0); @@ -226,7 +226,7 @@ public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) { } } catch (Exception e) { if (req.getCore() != null) { - boolean isTragic = req.getCore().getCoreContainer().checkTragicException(req.getCore()); + boolean isTragic = req.getCoreContainer().checkTragicException(req.getCore()); if (isTragic) { if (e instanceof SolrException) { // Tragic exceptions should always throw a server error diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java index 03579c6321f9..26bf4af349e7 100644 --- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java @@ -487,8 +487,8 @@ private void handleParams(ArrayList ops, RequestParams params) params.getZnodeVersion(), RequestParams.RESOURCE, params.toByteArray(), true); log.debug("persisted to version : {} ", latestVersion); - waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(), - req.getCore().getCoreContainer().getZkController(), RequestParams.NAME, latestVersion, 30); + waitForAllReplicasState(req.getCloudDescriptor().getCollectionName(), + req.getCoreContainer().getZkController(), RequestParams.NAME, latestVersion, 30); } } else { @@ -547,13 +547,13 @@ private void handleCommands(List ops, ConfigOverlay overlay) t int latestVersion = ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader, overlay.getZnodeVersion(), ConfigOverlay.RESOURCE_NAME, overlay.toByteArray(), true); log.debug("Executed config commands successfully and persisted to ZK {}", ops); - waitForAllReplicasState(req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName(), - req.getCore().getCoreContainer().getZkController(), + waitForAllReplicasState(req.getCloudDescriptor().getCollectionName(), + req.getCoreContainer().getZkController(), ConfigOverlay.NAME, latestVersion, 30); } else { SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, overlay.toByteArray()); - req.getCore().getCoreContainer().reload(req.getCore().getName(), req.getCore().uniqueId); + req.getCoreContainer().reload(req.getCore().getName(), req.getCore().uniqueId); log.info("Executed config commands successfully and persisted to File System {}", ops); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java index 9736190bfa09..a51bd284d969 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java @@ -138,7 +138,7 @@ public static Set initHidden(SolrParams invariants) { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws InterruptedException, KeeperException, IOException { - CoreContainer coreContainer = req.getCore().getCoreContainer(); + CoreContainer coreContainer = req.getCoreContainer(); if (coreContainer.isZooKeeperAware()) { showFromZooKeeper(req, rsp, coreContainer); } else { @@ -377,7 +377,7 @@ public static File getAdminFileFromFileSystem(SolrQueryRequest req, SolrQueryRes } // A leading slash is unnecessary but supported and interpreted as start of config dir Path filePath = configdir.toPath().resolve(fname.startsWith("/") ? fname.substring(1) : fname); - req.getCore().getCoreContainer().assertPathAllowed(filePath); + req.getCoreContainer().assertPathAllowed(filePath); if (!filePath.normalize().startsWith(configdir.toPath().normalize())) { log.error("Path must be inside core config directory"); rsp.setException( diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 885a272a6698..3f138cd759ec 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -156,8 +156,8 @@ public void prepDistributed(ResponseBuilder rb) { final String shards = params.get(ShardParams.SHARDS); CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor(); - CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor(); - ZkController zkController = req.getCore().getCoreContainer().getZkController(); + CloudDescriptor cloudDescriptor = req.getCloudDescriptor(); + ZkController zkController = req.getCoreContainer().getZkController(); final ReplicaListTransformer replicaListTransformer = httpShardHandlerFactory.getReplicaListTransformer(req); diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index a2382ea9d8a2..8fc980efc096 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -400,7 +400,7 @@ protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryReques final SolrParams params = req.getParams(); final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests) @SuppressWarnings("resource") - ZkController zkController = core == null ? null : core.getCoreContainer().getZkController(); + ZkController zkController = req.getCoreContainer().getZkController(); if (zkController != null) { return requestReplicaListTransformerGenerator.getReplicaListTransformer( params, diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java index 934ca0aecf80..4b40f64fc7fb 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java @@ -277,7 +277,7 @@ protected void prepareGrouping(ResponseBuilder rb) throws IOException { groupingSpec.setResponseFormat(responseFormat); // See SOLR-12249. Disallow grouping on text fields that are not SortableText in cloud mode - if (req.getCore().getCoreContainer().isZooKeeperAware()) { + if (req.getCoreContainer().isZooKeeperAware()) { IndexSchema schema = rb.req.getSchema(); String[] fields = params.getParams(GroupParams.GROUP_FIELD); if (fields != null) { diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index faf1093a46f6..bfc85ce5dacd 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -121,7 +121,7 @@ public void process(ResponseBuilder rb) throws IOException SolrQueryRequest req = rb.req; SolrQueryResponse rsp = rb.rsp; SolrParams params = req.getParams(); - CloudDescriptor cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor(); + CloudDescriptor cloudDesc = req.getCloudDescriptor(); if (cloudDesc != null) { Replica.Type replicaType = cloudDesc.getReplicaType(); @@ -173,7 +173,7 @@ public void process(ResponseBuilder rb) throws IOException try { if (log.isDebugEnabled()) { log.debug("{} min count to sync to (from most recent searcher view) {}" - , req.getCore().getCoreContainer().getZkController().getNodeName() + , req.getCoreContainer().getZkController().getNodeName() , searcher.count(new MatchAllDocsQuery())); } } finally { @@ -987,11 +987,11 @@ public int createSubRequests(ResponseBuilder rb) throws IOException { // TODO: handle collection=...? - ZkController zkController = rb.req.getCore().getCoreContainer().getZkController(); + ZkController zkController = rb.req.getCoreContainer().getZkController(); // if shards=... then use that if (zkController != null && params.get(ShardParams.SHARDS) == null) { - CloudDescriptor cloudDescriptor = rb.req.getCore().getCoreDescriptor().getCloudDescriptor(); + CloudDescriptor cloudDescriptor = rb.req.getCloudDescriptor(); String collection = cloudDescriptor.getCollectionName(); ClusterState clusterState = zkController.getClusterState(); @@ -1220,7 +1220,7 @@ public void processSync(ResponseBuilder rb, int nVersions, String sync) { boolean onlyIfActive = rb.req.getParams().getBool("onlyIfActive", false); if (onlyIfActive) { - if (rb.req.getCore().getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) { + if (rb.req.getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) { log.info("Last published state was not ACTIVE, cannot sync."); rb.rsp.add("sync", "false"); return; diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java index 660e1c75c089..7a5436f059fb 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java @@ -240,8 +240,8 @@ public List getComponents() { private ShardHandler getAndPrepShardHandler(SolrQueryRequest req, ResponseBuilder rb) { ShardHandler shardHandler = null; - CoreContainer cc = req.getCore().getCoreContainer(); - boolean isZkAware = cc.isZooKeeperAware(); + CoreContainer cc = req.getCoreContainer(); + boolean isZkAware = req.getCoreContainer().isZooKeeperAware(); rb.isDistrib = req.getParams().getBool(DISTRIB, isZkAware); if (!rb.isDistrib) { // for back compat, a shards param with URLs like localhost:8983/solr will mean that this diff --git a/solr/core/src/java/org/apache/solr/request/DelegatingSolrQueryRequest.java b/solr/core/src/java/org/apache/solr/request/DelegatingSolrQueryRequest.java new file mode 100644 index 000000000000..d17412fdbf26 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/request/DelegatingSolrQueryRequest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.request; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import java.security.Principal; +import java.util.List; +import java.util.Map; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.CommandOperation; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.servlet.HttpSolrCall; +import org.apache.solr.util.RTimerTree; + +/** + * A {@link SolrQueryRequest} implementation that defers to a delegate in all cases. + * + *

Used primarily in cases where developers want to customize one or more SolrQueryRequest + * methods while deferring the remainder to an existing instances. + */ +public class DelegatingSolrQueryRequest implements SolrQueryRequest { + private final SolrQueryRequest delegate; + + public DelegatingSolrQueryRequest(SolrQueryRequest delegate) { + this.delegate = delegate; + } + + @Override + public SolrParams getParams() { + return delegate.getParams(); + } + + @Override + public void setParams(SolrParams params) { + delegate.setParams(params); + } + + @Override + public Iterable getContentStreams() { + return delegate.getContentStreams(); + } + + @Override + public SolrParams getOriginalParams() { + return delegate.getOriginalParams(); + } + + @Override + public Map getContext() { + return delegate.getContext(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public long getStartTime() { + return delegate.getStartTime(); + } + + @Override + public RTimerTree getRequestTimer() { + return delegate.getRequestTimer(); + } + + @Override + public SolrIndexSearcher getSearcher() { + return delegate.getSearcher(); + } + + @Override + public SolrCore getCore() { + return delegate.getCore(); + } + + @Override + public IndexSchema getSchema() { + return delegate.getSchema(); + } + + @Override + public void updateSchemaToLatest() { + delegate.updateSchemaToLatest(); + } + + @Override + public String getParamString() { + return delegate.getParamString(); + } + + @Override + public Map getJSON() { + return delegate.getJSON(); + } + + @Override + public void setJSON(Map json) { + delegate.setJSON(json); + } + + @Override + public Principal getUserPrincipal() { + return delegate.getUserPrincipal(); + } + + @Override + public String getPath() { + return delegate.getPath(); + } + + @Override + public Map getPathTemplateValues() { + return delegate.getPathTemplateValues(); + } + + @Override + public List getCommands(boolean validateInput) { + return delegate.getCommands(validateInput); + } + + @Override + public String getHttpMethod() { + return delegate.getHttpMethod(); + } + + @Override + public HttpSolrCall getHttpSolrCall() { + return delegate.getHttpSolrCall(); + } + + @Override + public CoreContainer getCoreContainer() { + return delegate.getCoreContainer(); + } + + @Override + public CloudDescriptor getCloudDescriptor() { + return delegate.getCloudDescriptor(); + } +} diff --git a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java index 919f2e4650d5..347f18eb37fc 100644 --- a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java +++ b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java @@ -168,7 +168,7 @@ public SimpleFacets(SolrQueryRequest req, this.docsOrig = docs; this.global = params; this.rb = rb; - this.facetExecutor = req.getCore().getCoreContainer().getUpdateShardHandler().getUpdateExecutor(); + this.facetExecutor = req.getCoreContainer().getUpdateShardHandler().getUpdateExecutor(); } public void setFacetDebugInfo(FacetDebugInfo fdebugParent) { diff --git a/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java b/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java index 2c5090ba16a7..17e22713893e 100644 --- a/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java +++ b/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java @@ -16,11 +16,15 @@ */ package org.apache.solr.request; + import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.schema.IndexSchema; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStream; +import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; +import org.apache.solr.cloud.CloudDescriptor; + import org.apache.solr.servlet.HttpSolrCall; import org.apache.solr.common.util.CommandOperation; import org.apache.solr.util.RTimerTree; @@ -132,8 +136,14 @@ default String getHttpMethod() { default HttpSolrCall getHttpSolrCall() { return null; } -} - + default CoreContainer getCoreContainer() { + SolrCore core = getCore(); + return core == null ? null : core.getCoreContainer(); + } + default CloudDescriptor getCloudDescriptor() { + return getCore().getCoreDescriptor().getCloudDescriptor(); + } +} diff --git a/solr/core/src/java/org/apache/solr/response/transform/CoreAugmenterFactory.java b/solr/core/src/java/org/apache/solr/response/transform/CoreAugmenterFactory.java new file mode 100644 index 000000000000..eb53a99a2963 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/response/transform/CoreAugmenterFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.response.transform; + +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.request.SolrQueryRequest; + +public class CoreAugmenterFactory extends TransformerFactory { + + @Override + public DocTransformer create(String field, SolrParams params, SolrQueryRequest req) { + return new ValueAugmenterFactory.ValueAugmenter(field, req.getCore().getName()); + } +} diff --git a/solr/core/src/java/org/apache/solr/response/transform/TransformerFactory.java b/solr/core/src/java/org/apache/solr/response/transform/TransformerFactory.java index ed8a9302b989..87288311608e 100644 --- a/solr/core/src/java/org/apache/solr/response/transform/TransformerFactory.java +++ b/solr/core/src/java/org/apache/solr/response/transform/TransformerFactory.java @@ -51,5 +51,7 @@ public void init(@SuppressWarnings({"rawtypes"})NamedList args) { defaultFactories.put( "json", new RawValueTransformerFactory("json") ); defaultFactories.put( "xml", new RawValueTransformerFactory("xml") ); defaultFactories.put( "geo", new GeoTransformerFactory() ); + defaultFactories.put("core", new CoreAugmenterFactory()); + } } diff --git a/solr/core/src/java/org/apache/solr/search/JoinQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/JoinQParserPlugin.java index 338451419646..3aa11d5a3561 100644 --- a/solr/core/src/java/org/apache/solr/search/JoinQParserPlugin.java +++ b/solr/core/src/java/org/apache/solr/search/JoinQParserPlugin.java @@ -167,7 +167,7 @@ JoinParams parseJoin(QParser qparser) throws SyntaxError { long fromCoreOpenTime = 0; if (fromIndex != null && !fromIndex.equals(qparser.req.getCore().getCoreDescriptor().getName()) ) { - CoreContainer container = qparser.req.getCore().getCoreContainer(); + CoreContainer container = qparser.req.getCoreContainer(); // if in SolrCloud mode, fromIndex should be the name of a single-sharded collection coreName = ScoreJoinQParserPlugin.getCoreName(fromIndex, container); diff --git a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java index 51f2dcc1f810..7cd7607d4a5f 100644 --- a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java +++ b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java @@ -90,7 +90,7 @@ public OtherCoreJoinQuery(Query fromQuery, String fromField, public Weight createWeight(IndexSearcher searcher, org.apache.lucene.search.ScoreMode scoreMode, float boost) throws IOException { SolrRequestInfo info = SolrRequestInfo.getRequestInfo(); - CoreContainer container = info.getReq().getCore().getCoreContainer(); + CoreContainer container = info.getReq().getCoreContainer(); final SolrCore fromCore = container.getCore(fromIndex); @@ -228,7 +228,7 @@ private Query createQuery(final String fromField, final String fromQueryStr, final String myCore = req.getCore().getCoreDescriptor().getName(); if (fromIndex != null && (!fromIndex.equals(myCore) || byPassShortCircutCheck)) { - CoreContainer container = req.getCore().getCoreContainer(); + CoreContainer container = req.getCoreContainer(); final String coreName = getCoreName(fromIndex, container); final SolrCore fromCore = container.getCore(coreName); diff --git a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java new file mode 100644 index 000000000000..9cc1a89d922b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.api.CoordinatorV2HttpSolrCall; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.logging.MDCLoggingContext; +import org.apache.solr.request.DelegatingSolrQueryRequest; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A coordinator node can serve requests as if it hosts all collections in the cluster. it does so + * by hosting a synthetic replica for each configset used in the cluster. + * + *

This class is responsible for forwarding the requests to the right core when the node is + * acting as a Coordinator The responsibilities also involve creating a synthetic collection or + * replica if they do not exist. It also sets the right threadlocal variables which reflects the + * current collection being served. + */ +public class CoordinatorHttpSolrCall extends HttpSolrCall { + public static final String SYNTHETIC_COLL_PREFIX = + ".sys." + "COORDINATOR-COLL-"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String collectionName; + private final Factory factory; + + public CoordinatorHttpSolrCall( + Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cores, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + log.info("getCoreByCollection(String collectionName({})", collectionName); + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return getCore(factory, this, collectionName, isPreferLeader); + } + + public static SolrCore getCore( + Factory factory, HttpSolrCall solrCall, String collectionName, boolean isPreferLeader) { + log.info("CoordinatorHttpSolrCall#getCore({})", collectionName); + String syntheticCoreName = factory.collectionVsCoreNameMapping.get(collectionName); + if (syntheticCoreName != null) { + SolrCore syntheticCore = solrCall.cores.getCore(syntheticCoreName); + setMDCLoggingContext(collectionName); + return syntheticCore; + } else { + ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collectionName, true); + SolrCore core = null; + if (coll != null) { + String confName = null; + try { + confName = zkStateReader.readConfigName(collectionName); + } catch (KeeperException e) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not locate config for collection [" + + collectionName + + "] ", e); + } + String syntheticCollectionName = getSyntheticCollectionName(confName); + + DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName); + synchronized (CoordinatorHttpSolrCall.class) { + if (syntheticColl == null) { + // no synthetic collection for this config, let's create one + if (log.isInfoEnabled()) { + log.info( + "synthetic collection: {} does not exist, creating.. ", syntheticCollectionName); + } + + SolrException createException = null; + try { + createColl(syntheticCollectionName, solrCall.cores, confName); + } catch (SolrException exception) { + // concurrent requests could have created the collection hence causing collection + // exists + // exception + createException = exception; + } finally { + syntheticColl = + zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName); + } + + // then indeed the collection was not created properly, either by this or other + // concurrent + // requests + if (syntheticColl == null) { + if (createException != null) { + throw createException; // rethrow the exception since such collection was not + // created + } else { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not locate synthetic collection [" + + syntheticCollectionName + + "] after creation!"); + } + } + } + + // get docCollection again to ensure we get the fresh state + syntheticColl = + zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName); + List nodeNameSyntheticReplicas = + syntheticColl.getReplicas(solrCall.cores.getZkController().getNodeName()); + if (nodeNameSyntheticReplicas == null || nodeNameSyntheticReplicas.isEmpty()) { + // this node does not have a replica. add one + if (log.isInfoEnabled()) { + log.info( + "this node does not have a replica of the synthetic collection: {} , adding replica ", + syntheticCollectionName); + } + + addReplica(syntheticCollectionName, solrCall.cores); + } + + // still have to ensure that it's active, otherwise super.getCoreByCollection + // will return null and then CoordinatorHttpSolrCall will call getCore again + // hence creating a calling loop + try { + zkStateReader.waitForState( + syntheticCollectionName, + 10, + TimeUnit.SECONDS, + docCollection -> { + for (Replica nodeNameSyntheticReplica : + docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) { + if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) { + return true; + } + } + return false; + }); + } catch (Exception e) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Failed to wait for active replica for synthetic collection [" + + syntheticCollectionName + + "]", + e); + } + } + + core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + if (core != null) { + factory.collectionVsCoreNameMapping.put(collectionName, core.getName()); + // for the watcher, only remove on collection deletion (ie collection == null), since + // watch from coordinator is collection specific + solrCall + .cores + .getZkController() + .getZkStateReader() + .registerDocCollectionWatcher( + collectionName, + collection -> { + if (collection == null) { + factory.collectionVsCoreNameMapping.remove(collectionName); + return true; + } else { + return false; + } + }); + if (log.isDebugEnabled()) { + log.debug("coordinator node, returns synthetic core: {}", core.getName()); + } + } + setMDCLoggingContext(collectionName); + return core; + } + return null; + } + } + + public static String getSyntheticCollectionName(String configName) { + return SYNTHETIC_COLL_PREFIX + configName; + } + + /** + * Overrides the MDC context as the core set was synthetic core, which does not reflect the + * collection being operated on + */ + private static void setMDCLoggingContext(String collectionName) { + MDCLoggingContext.setCollection(collectionName); + + // below is irrelevant for call to coordinator + MDCLoggingContext.setCoreName(null); + MDCLoggingContext.setShard(null); + MDCLoggingContext.setCoreName(null); + } + + private static void addReplica(String syntheticCollectionName, CoreContainer cores) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + CollectionAdminRequest.AddReplica addReplicaRequest = + CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1") + // we are fixing the name, so that no two replicas are created in the same node + .setNode(cores.getZkController().getNodeName()); + addReplicaRequest.setWaitForFinalState(true); + cores + .getCollectionsHandler() + .handleRequestBody(new LocalSolrQueryRequest(null, addReplicaRequest.getParams()), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private static void createColl( + String syntheticCollectionName, CoreContainer cores, String confName) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + CollectionAdminRequest.Create collCreationRequest = + CollectionAdminRequest.createCollection(syntheticCollectionName, confName, 1, 1) + .setCreateNodeSet(cores.getZkController().getNodeName()); + collCreationRequest.setWaitForFinalState(true); + SolrParams params = collCreationRequest.getParams(); + if (log.isInfoEnabled()) { + log.info("sending collection admin command : {}", Utils.toJSONString(params)); + } + cores.getCollectionsHandler().handleRequestBody(new LocalSolrQueryRequest(null, params), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not create :" + + syntheticCollectionName + + " collection: " + + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = wrappedReq(solrReq, collectionName, this); + } + } + + @Override + protected String getCoreOrColName() { + return collectionName; + } + + public static SolrQueryRequest wrappedReq( + SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) { + if(collectionName== null) return delegate; + Properties p = new Properties(); + log.info("CoordinatorHttpSolrCall#wrappedReq({})",collectionName); + p.put(CoreDescriptor.CORE_COLLECTION, collectionName); + p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString()); + p.put(CoreDescriptor.CORE_SHARD, "_"); + + CloudDescriptor cloudDescriptor = + new CloudDescriptor( + delegate.getCore().getCoreDescriptor(), delegate.getCore().getName(), p); + return new DelegatingSolrQueryRequest(delegate) { + @Override + public HttpSolrCall getHttpSolrCall() { + return httpSolrCall; + } + + @Override + public CloudDescriptor getCloudDescriptor() { + return cloudDescriptor; + } + }; + } + + // The factory that creates an instance of HttpSolrCall + public static class Factory implements SolrDispatchFilter.HttpSolrCallFactory { + private final Map collectionVsCoreNameMapping = new ConcurrentHashMap<>(); + + @Override + public HttpSolrCall createInstance( + SolrDispatchFilter filter, + String path, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + if ((path.startsWith("/____v2/") || path.equals("/____v2"))) { + return new CoordinatorV2HttpSolrCall(this, filter, cores, request, response, retry); + } else if (path.startsWith("/" + SYNTHETIC_COLL_PREFIX)) { + return SolrDispatchFilter.HttpSolrCallFactory.super.createInstance( + filter, path, cores, request, response, retry); + } else { + return new CoordinatorHttpSolrCall(this, filter, cores, request, response, retry); + } + } + } +} diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java index c58d52b9f44b..607e7d1f617e 100644 --- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java +++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java @@ -228,6 +228,8 @@ protected void init() throws Exception { } queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); + log.info("HttpSolrCall.init({}?{})", req.getRequestURL(), req.getQueryString()); + // unused feature ? int idx = path.indexOf(':'); @@ -603,6 +605,13 @@ public Action call() throws IOException { } } + protected String getCoreOrColName() { + String coreOrColName = HttpSolrCall.this.origCorename; + if (coreOrColName == null && getCore() != null) { + coreOrColName = getCore().getName(); + } + return coreOrColName; + } private boolean shouldAudit() { return cores.getAuditLoggerPlugin() != null; diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 9f0a7b3f275e..90b977d6c569 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -69,6 +69,7 @@ import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.NodeConfig; +import org.apache.solr.core.NodeRoles; import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrInfoBean; import org.apache.solr.core.SolrXmlConfig; @@ -103,6 +104,8 @@ public class SolrDispatchFilter extends BaseSolrFilter { protected final CountDownLatch init = new CountDownLatch(1); protected String abortErrorMessage = null; + private HttpSolrCallFactory solrCallFactory; + //TODO using Http2Client protected HttpClient httpClient; private ArrayList excludePatterns; @@ -182,6 +185,13 @@ public void init(FilterConfig config) throws ServletException { } coresInit = createCoreContainer(computeSolrHome(config), extraProperties); + boolean isCoordinator = + NodeRoles.MODE_ON.equals( + coresInit + .nodeRoles + .getRoleMode(NodeRoles.Role.COORDINATOR)); + solrCallFactory = + isCoordinator ? new CoordinatorHttpSolrCall.Factory() : new HttpSolrCallFactory() {}; this.httpClient = coresInit.getUpdateShardHandler().getDefaultHttpClient(); setupJvmMetrics(coresInit); @@ -479,11 +489,13 @@ private void consumeInputFully(HttpServletRequest req, HttpServletResponse respo protected HttpSolrCall getHttpSolrCall(HttpServletRequest request, HttpServletResponse response, boolean retry) { String path = ServletUtils.getPathAfterContext(request); - if (isV2Enabled && (path.startsWith("/____v2/") || path.equals("/____v2"))) { - return new V2HttpCall(this, cores, request, response, false); - } else { - return new HttpSolrCall(this, cores, request, response, retry); + CoreContainer cores = null; + try { + cores = getCores(); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Core Container Unavailable"); } + return solrCallFactory.createInstance(this, path, cores, request, response, retry); } private boolean authenticateRequest(HttpServletRequest request, HttpServletResponse response, final AtomicReference wrappedRequest) throws IOException { @@ -681,4 +693,19 @@ public void close() { public void closeOnDestroy(boolean closeOnDestroy) { this.closeOnDestroy = closeOnDestroy; } + public interface HttpSolrCallFactory { + default HttpSolrCall createInstance( + SolrDispatchFilter filter, + String path, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + if (filter.isV2Enabled && (path.startsWith("/____v2/") || path.equals("/____v2"))) { + return new V2HttpCall(filter, cores, request, response, retry); + } else { + return new HttpSolrCall(filter, cores, request, response, retry); + } + } + } } diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 789f1455a508..590ce372e8ea 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -1838,7 +1838,7 @@ public void doReplay(TransactionLog translog) { UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp); - OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor(); + OrderedExecutor executor = inSortedOrder ? null : req.getCoreContainer().getReplayUpdatesExecutor(); AtomicInteger pendingTasks = new AtomicInteger(0); AtomicReference exceptionOnExecuteUpdate = new AtomicReference<>(); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java index 93c1bf2871e0..deaa1d9f9f5f 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java @@ -50,7 +50,7 @@ public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { - final boolean isZkAware = req.getCore().getCoreContainer().isZooKeeperAware(); + final boolean isZkAware = req.getCoreContainer().isZooKeeperAware(); DistributedUpdateProcessor distribUpdateProcessor = isZkAware ? diff --git a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java index d95f946578b0..112eae2a883a 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java @@ -93,10 +93,10 @@ public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestPro // todo: a core should have a more direct way of finding a collection name, and the collection properties SolrCore core = req.getCore(); CoreDescriptor coreDescriptor = core.getCoreDescriptor(); - CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor(); + CloudDescriptor cloudDescriptor = req.getCloudDescriptor(); if (cloudDescriptor != null) { String collectionName = cloudDescriptor.getCollectionName(); - CoreContainer coreContainer = core.getCoreContainer(); + CoreContainer coreContainer = req.getCoreContainer(); ZkController zkController = coreContainer.getZkController(); ZkStateReader zkStateReader = zkController.getZkStateReader(); Map collectionProperties = zkStateReader.getCollectionProperties(collectionName, CACHE_FOR_MILLIS); @@ -127,7 +127,7 @@ public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestPro } private static Map getAliasProps(SolrQueryRequest req, String aliasName) { - ZkController zkController = req.getCore().getCoreContainer().getZkController(); + ZkController zkController = req.getCoreContainer().getZkController(); final Map aliasProperties = zkController.getZkStateReader().getAliases().getCollectionAliasProperties(aliasName); if (aliasProperties.isEmpty()) { throw RoutedAlias.newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map @@ -141,8 +141,8 @@ private RoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor this.routedAlias = routedAlias; assert aliasDistribPhase == DistribPhase.NONE; final SolrCore core = req.getCore(); - final CoreContainer cc = core.getCoreContainer(); - this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); + final CoreContainer cc = req.getCoreContainer(); + this.thisCollection = req.getCloudDescriptor().getCollectionName(); this.req = req; this.zkController = cc.getZkController(); this.cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); diff --git a/solr/core/src/test-files/solr/configsets/cache-control/conf/schema.xml b/solr/core/src/test-files/solr/configsets/cache-control/conf/schema.xml new file mode 100644 index 000000000000..cef2b0a12e3a --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/cache-control/conf/schema.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + id + \ No newline at end of file diff --git a/solr/core/src/test-files/solr/configsets/cache-control/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cache-control/conf/solrconfig.xml new file mode 100644 index 000000000000..ee9a217c06a7 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/cache-control/conf/solrconfig.xml @@ -0,0 +1,54 @@ + + + + + + + + + ${solr.data.dir:} + + + + + ${tests.luceneMatchVersion:LATEST} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + max-age=30, public + + + + + + explicit + true + text + + + + + + : + \ No newline at end of file diff --git a/solr/core/src/test-files/solr/configsets/conf3/conf/schema.xml b/solr/core/src/test-files/solr/configsets/conf3/conf/schema.xml new file mode 100644 index 000000000000..85f7ed35453b --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/conf3/conf/schema.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + id + + + + + + diff --git a/solr/core/src/test-files/solr/configsets/conf3/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/conf3/conf/solrconfig.xml new file mode 100644 index 000000000000..0b3ec4d24ce1 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/conf3/conf/solrconfig.xml @@ -0,0 +1,68 @@ + + + + + + + + + ${solr.data.dir:} + + + + + ${tests.luceneMatchVersion:LATEST} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + explicit + true + text + + + + + + + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java index b40a7a25c258..2f682fb10c0d 100644 --- a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java +++ b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java @@ -16,6 +16,7 @@ */ package org.apache.solr.common.cloud; +import java.util.Collections; import java.util.Set; /** @@ -32,5 +33,7 @@ public Set getStateWatchers(String collection) { return zkStateReader.getStateWatchers(collection); } - + public Set getWatchedCollections() { + return Collections.unmodifiableSet(zkStateReader.getCollectionWatches().keySet()); + } } diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java new file mode 100644 index 000000000000..f56ac85d63bb --- /dev/null +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java @@ -0,0 +1,888 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.search; + +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER; +import static org.apache.solr.common.params.CommonParams.TRUE; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableList; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.Http2SolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.MiniSolrCloudCluster; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.cloud.ZkStateReaderAccessor; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.core.NodeRoles; +import org.apache.solr.servlet.CoordinatorHttpSolrCall; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCoordinatorRole extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public void testSimple() throws Exception { + MiniSolrCloudCluster cluster = + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); + try { + CloudSolrClient client = cluster.getSolrClient(); + String COLLECTION_NAME = "test_coll"; + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf"); + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < 10; i++) { + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("id", "" + i); + ur.add(doc2); + } + + ur.commit(client, COLLECTION_NAME); + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); + assertEquals(10, rsp.getResults().getNumFound()); + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + final JettySolrRunner coordinatorJetty; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + QueryResponse rslt = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, COLLECTION_NAME); + + assertEquals(10, rslt.getResults().size()); + + DocCollection collection = + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); + assertNotNull(collection); + + Set expectedNodes = new HashSet<>(); + expectedNodes.add(coordinatorJetty.getNodeName()); + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName())); + assertTrue(expectedNodes.isEmpty()); + } finally { + cluster.shutdown(); + } + } + + public void testMultiCollectionMultiNode() throws Exception { + MiniSolrCloudCluster cluster = + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); + try { + CloudSolrClient client = cluster.getSolrClient(); + String COLLECTION_NAME = "test_coll"; + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf"); + for (int j = 1; j <= 10; j++) { + String collname = COLLECTION_NAME + "_" + j; + CollectionAdminRequest.createCollection(collname, "conf", 2, 2) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(collname, 2, 4); + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < 10; i++) { + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("id", "" + i); + ur.add(doc2); + } + + ur.commit(client, collname); + QueryResponse rsp = client.query(collname, new SolrQuery("*:*")); + assertEquals(10, rsp.getResults().getNumFound()); + } + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + final JettySolrRunner coordinatorJetty1; + final JettySolrRunner coordinatorJetty2; + try { + coordinatorJetty1 = cluster.startJettySolrRunner(); + coordinatorJetty2 = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + for (int j = 1; j <= 10; j++) { + String collname = COLLECTION_NAME + "_" + j; + QueryResponse rslt = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty1.getNodeName())) + .process(client, collname); + + assertEquals(10, rslt.getResults().size()); + } + + for (int j = 1; j <= 10; j++) { + String collname = COLLECTION_NAME + "_" + j; + QueryResponse rslt = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty2.getNodeName())) + .process(client, collname); + + assertEquals(10, rslt.getResults().size()); + } + + DocCollection collection = + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); + assertNotNull(collection); + + int coordNode1NumCores = coordinatorJetty1.getCoreContainer().getNumAllCores(); + assertEquals("Unexpected number of cores found for coordinator node", 1, coordNode1NumCores); + int coordNode2NumCores = coordinatorJetty2.getCoreContainer().getNumAllCores(); + assertEquals("Unexpected number of cores found for coordinator node", 1, coordNode2NumCores); + } finally { + cluster.shutdown(); + } + } + + public void testNRTRestart() throws Exception { + // we restart jetty and expect to find on disk data - need a local fs directory + useFactory(null); + String COLL = "coordinator_test_coll"; + MiniSolrCloudCluster cluster = + configureCluster(3) + .withJettyConfig(jetty -> jetty.enableV2(true)) + .addConfig("conf", configset("conf3")) + .configure(); + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner qaJetty = cluster.startJettySolrRunner(); + String qaJettyBase = qaJetty.getBaseUrl().toString(); + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + ExecutorService executor = + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty")); + try { + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLL, 1, 2); + DocCollection docColl = + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL); + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0); + assertNotNull(nrtReplica); + String nrtCore = nrtReplica.getCoreName(); + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0); + assertNotNull(pullReplica); + String pullCore = pullReplica.getCoreName(); + + SolrInputDocument sid = new SolrInputDocument(); + sid.addField("id", "123"); + sid.addField("desc_s", "A Document"); + JettySolrRunner nrtJetty = null; + JettySolrRunner pullJetty = null; + for (JettySolrRunner j : cluster.getJettySolrRunners()) { + String nodeName = j.getNodeName(); + if (nodeName.equals(nrtReplica.getNodeName())) { + nrtJetty = j; + } else if (nodeName.equals(pullReplica.getNodeName())) { + pullJetty = j; + } + } + assertNotNull(nrtJetty); + assertNotNull(pullJetty); + try (SolrClient client = pullJetty.newClient()) { + client.add(COLL, sid); + client.commit(COLL); + assertEquals( + nrtCore, + getHostCoreName( + COLL, qaJettyBase, p -> p.add(ShardParams.SHARDS_PREFERENCE, "replica.type:NRT"))); + assertEquals( + pullCore, + getHostCoreName( + COLL, qaJettyBase, p -> p.add(ShardParams.SHARDS_PREFERENCE, "replica.type:PULL"))); + // Now , kill NRT jetty + JettySolrRunner nrtJettyF = nrtJetty; + JettySolrRunner pullJettyF = pullJetty; + Random r = random(); + final long establishBaselineMs = r.nextInt(1000); + final long nrtDowntimeMs = r.nextInt(10000); + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our + // indexing code, + // based on the fact that our indexing is based on a PULL-node client. + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000); + Future jettyManipulationFuture = + executor.submit( + () -> { + // we manipulate the jetty instances in a separate thread to more closely mimic + // the behavior we'd see irl. + try { + Thread.sleep(establishBaselineMs); + log.info("stopping NRT jetty ..."); + nrtJettyF.stop(); + log.info("NRT jetty stopped."); + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while + log.info("restarting NRT jetty ..."); + nrtJettyF.start(true); + log.info("NRT jetty restarted."); + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK + // state used for query request routing has expired (60s). But here we force a + // return to NRT by stopping the PULL replica after a brief delay ... + Thread.sleep(pullServiceTimeMs); + log.info("stopping PULL jetty ..."); + pullJettyF.stop(); + log.info("PULL jetty stopped."); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + String hostCore; + long start = new Date().getTime(); + long individualRequestStart = start; + int count = 0; + while (nrtCore.equals( + hostCore = + getHostCoreName( + COLL, + qaJettyBase, + p -> p.add(ShardParams.SHARDS_PREFERENCE, "replica.type:NRT")))) { + count++; + Thread.sleep(100); + individualRequestStart = new Date().getTime(); + } + long now = new Date().getTime(); + log.info( + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}", + count, + now - start, + establishBaselineMs, + now - individualRequestStart); + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be + // very fast, because our QA-based client already knows both replicas are active, the index + // is stable, so the moment the client finds NRT is down it should be able to failover + // immediately and transparently to PULL. + assertEquals( + "when we break out of the NRT query loop, should be b/c routed to PULL", + pullCore, + hostCore); + SolrInputDocument d = new SolrInputDocument(); + d.addField("id", "345"); + d.addField("desc_s", "Another Document"); + // attempts to add another doc while NRT is down should fail, then eventually succeed when + // NRT comes back up + count = 0; + start = new Date().getTime(); + individualRequestStart = start; + for (; ; ) { + try { + client.add(COLL, d); + client.commit(COLL); + break; + } catch (SolrException ex) { + // we expect these until nrtJetty is back up. + count++; + Thread.sleep(100); + } + individualRequestStart = new Date().getTime(); + } + now = new Date().getTime(); + log.info( + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}", + now - individualRequestStart, + now - start, + nrtDowntimeMs, + count); + // NRT replica is back up, registered as available with Zk, and availability info has been + // pulled down by our PULL-replica-based `client`, forwarded indexing command to NRT, + // index/commit completed. All of this accounts for the 3000ms tolerance allowed for below. + // This is not a strict value, and if it causes failures regularly we should feel free to + // increase the tolerance; but it's meant to provide a stable baseline from which to detect + // regressions. + count = 0; + start = new Date().getTime(); + individualRequestStart = start; + while (pullCore.equals( + hostCore = + getHostCoreName( + COLL, + qaJettyBase, + p -> { + p.set(CommonParams.Q, "id:345"); + p.add(ShardParams.SHARDS_PREFERENCE, "replica.type:NRT"); + }))) { + count++; + Thread.sleep(100); + individualRequestStart = new Date().getTime(); + } + now = new Date().getTime(); + log.info( + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}", + count, + now - start, + pullServiceTimeMs, + now - individualRequestStart); + assertEquals(nrtCore, hostCore); + // allow any exceptions to propagate + jettyManipulationFuture.get(); + + // next phase: just toggle a bunch + // TODO: could separate this out into a different test method, but this should suffice for + // now + pullJetty.start(true); + AtomicBoolean done = new AtomicBoolean(); + long runMinutes = 1; + long finishTimeMs = + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES); + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF}; + Random threadRandom = new Random(r.nextInt()); + Future f = + executor.submit( + () -> { + int iteration = 0; + while (new Date().getTime() < finishTimeMs && !done.get()) { + int idx = iteration++ % jettys.length; + JettySolrRunner toManipulate = jettys[idx]; + try { + int serveTogetherTime = threadRandom.nextInt(7000); + int downTime = threadRandom.nextInt(7000); + log.info("serving together for {}ms", serveTogetherTime); + Thread.sleep(serveTogetherTime); + log.info("stopping {} ...", idx); + toManipulate.stop(); + log.info("stopped {}.", idx); + Thread.sleep(downTime); + log.info("restarting {} ...", idx); + toManipulate.start(true); + log.info("restarted {}.", idx); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + done.set(true); + return iteration; + }); + count = 0; + start = new Date().getTime(); + try { + do { + if (pullCore.equals( + getHostCoreName( + COLL, + qaJettyBase, + p -> { + p.set(CommonParams.Q, "id:345"); + p.add(ShardParams.SHARDS_PREFERENCE, "replica.type:NRT"); + }))) { + done.set(true); + } + count++; + Thread.sleep(100); + } while (!done.get()); + } finally { + final String result; + if (done.getAndSet(true)) { + result = "Success"; + } else { + // not yet set to done, completed abnormally (exception will be thrown beyond `finally` + // block) + result = "Failure"; + } + Integer toggleCount = f.get(); + long secondsDuration = + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS); + log.info( + "{}! {} seconds, {} toggles, {} requests served", + result, + secondsDuration, + toggleCount, + count); + } + } + } finally { + try { + ExecutorUtil.shutdownAndAwaitTermination(executor); + } finally { + cluster.shutdown(); + } + } + } + + private String getHostCoreName(String COLL, String qaNode, Consumer p) + throws Exception { + boolean found = false; + SolrQuery q = + new SolrQuery( + CommonParams.Q, + "*:*", + CommonParams.FL, + "id,desc_s,_core_:[core]", + OMIT_HEADER, + TRUE, + CommonParams.WT, + CommonParams.JAVABIN); + p.accept(q); + SolrDocumentList docs = null; + try (SolrClient solrClient = new Http2SolrClient.Builder(qaNode).build()) { + for (int i = 0; i < 100; i++) { + try { + QueryResponse queryResponse = solrClient.query(COLL, q); + docs = queryResponse.getResults(); + assertNotNull("Docs should not be null. Query response was: " + queryResponse, docs); + if (docs.size() > 0) { + found = true; + break; + } + } catch (SolrException ex) { + // we know we're doing tricky things that might cause transient errors + // TODO: all these query requests go to the QA node -- should QA propagate internal + // request errors to the external client (and the external client retry?) or should QA + // attempt to failover transparently in the event of an error? + if (i < 5) { + log.info("swallowing transient error", ex); + } else { + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex); + fail("initial error time threshold exceeded"); + } + } + Thread.sleep(100); + } + } + assertTrue(found); + return (String) docs.get(0).getFieldValue("_core_"); + } + + public void testConcurrentAccess() throws Exception { + final int DATA_NODE_COUNT = 2; + final int COORDINATOR_NODE_COUNT = 4; + MiniSolrCloudCluster cluster = + configureCluster(DATA_NODE_COUNT).addConfig("conf", configset("cloud-minimal")).configure(); + + List dataNodes = + cluster.getJettySolrRunners().stream() + .map(JettySolrRunner::getNodeName) + .collect(Collectors.toList()); + + try { + CloudSolrClient client = cluster.getSolrClient(); + String COLLECTION_PREFIX = "test_coll_"; + + final int COLLECTION_COUNT = 10; + final int DOC_PER_COLLECTION_COUNT = 1000; + + List collectionNames = new ArrayList<>(); + for (int i = 0; i < COLLECTION_COUNT; i++) { + String collectionName = COLLECTION_PREFIX + i; + CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1) + .setCreateNodeSet(String.join(",", dataNodes)) // only put data onto the 2 data nodes + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(collectionName, 2, 2); + collectionNames.add(collectionName); + } + + for (String collectionName : collectionNames) { + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) { + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("id", collectionName + "-" + i); + ur.add(doc2); + } + ur.commit(client, collectionName); + QueryResponse rsp = client.query(collectionName, new SolrQuery("*:*")); + assertEquals(DOC_PER_COLLECTION_COUNT, rsp.getResults().getNumFound()); + } + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + List coordinatorNodes = new ArrayList<>(); + try { + for (int i = 0; i < COORDINATOR_NODE_COUNT; i++) { + JettySolrRunner coordinatorJetty = cluster.startJettySolrRunner(); + coordinatorNodes.add(coordinatorJetty.getNodeName()); + } + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + + int THREAD_COUNT = 10; + int RUN_COUNT = 20; + // final AtomicInteger runCounter = new AtomicInteger(); + // 10 threads to concurrently access the collections and ensure data are not mixed up + ExecutorService executorService = + ExecutorUtil.newMDCAwareFixedThreadPool( + THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName())); + List> testFutures = new ArrayList<>(); + + for (int i = 0; i < RUN_COUNT; i++) { + final int currentRun = i; + testFutures.add( + executorService.submit( + () -> { + final String collectionName = + collectionNames.get(currentRun % collectionNames.size()); + final String coordinatorNode = + coordinatorNodes.get(currentRun % coordinatorNodes.size()); + QueryResponse response = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorNode)) + .process(client, collectionName); + assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound()); + // ensure docs have the correct id (ie not mixing up with other collections) + for (SolrDocument doc : response.getResults()) { + assertTrue(((String) doc.getFieldValue("id")).startsWith(collectionName)); + } + return null; + })); + } + for (Future testFuture : testFutures) { + testFuture.get(); // check for any exceptions/failures + } + + // number of replicas created in the synthetic collection should be one per coordinator node + assertEquals( + COORDINATOR_NODE_COUNT, + client + .getZkStateReader() + .getClusterState() + .getCollection(CoordinatorHttpSolrCall.getSyntheticCollectionName("conf")) + .getReplicas() + .size()); + + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + } finally { + cluster.shutdown(); + } + } + + public void testConfigset() throws Exception { + final int DATA_NODE_COUNT = 1; + MiniSolrCloudCluster cluster = + configureCluster(DATA_NODE_COUNT) + .addConfig("conf1", configset("cloud-minimal")) + .addConfig("conf2", configset("cache-control")) + .configure(); + List dataNodes = + cluster.getJettySolrRunners().stream() + .map(JettySolrRunner::getNodeName) + .collect(Collectors.toList()); + + try { + CollectionAdminRequest.createCollection("c1", "conf1", 2, 1) + .setMaxShardsPerNode(10) + . process(cluster.getSolrClient()); + cluster.waitForActiveCollection("c1", 2, 2); + CollectionAdminRequest.createCollection("c2", "conf2", 2, 1) + .setMaxShardsPerNode(10) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection("c2", 2, 2); + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner coordinatorJetty; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + + // Tricky to test configset, since operation such as collection status would direct it to the + // OS node. + // So we use query and check the cache response header, which is determined by the + // solr-config.xml in the configset + // However using solr client would drop cache response header, hence we need to use the + // underlying httpClient which has SSL correctly configured + + try (HttpSolrClient solrClient = + new HttpSolrClient.Builder(coordinatorJetty.getBaseUrl().toString()).build()) { + HttpResponse response = + solrClient + .getHttpClient() + .execute(new HttpGet(coordinatorJetty.getBaseUrl() + "/c1/select?q=*:*")); + // conf1 has no cache-control + assertNull(response.getFirstHeader("cache-control")); + + response = + solrClient + .getHttpClient() + .execute(new HttpGet(coordinatorJetty.getBaseUrl() + "/c2/select?q=*:*")); + // conf2 has cache-control defined + assertTrue(response.getFirstHeader("cache-control").getValue().contains("max-age=30")); + } + } finally { + cluster.shutdown(); + } + } + + public void testWatch() throws Exception { + final int DATA_NODE_COUNT = 1; + MiniSolrCloudCluster cluster = + configureCluster(DATA_NODE_COUNT) + .addConfig("conf1", configset("cloud-minimal")) + .configure(); + final String TEST_COLLECTION_1 = "c1"; + final String TEST_COLLECTION_2 = "c2"; + + try { + CloudSolrClient client = cluster.getSolrClient(); + CollectionAdminRequest.createCollection(TEST_COLLECTION_1, "conf1", 1, 2) + .setMaxShardsPerNode(10) + .process(client); + cluster.waitForActiveCollection(TEST_COLLECTION_1, 1, 2); + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner coordinatorJetty; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + + ZkStateReader zkStateReader = + coordinatorJetty.getCoreContainer().getZkController().getZkStateReader(); + ZkStateReaderAccessor zkWatchAccessor = new ZkStateReaderAccessor(zkStateReader); + + // no watch at first + assertTrue(!zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_1)); + new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, TEST_COLLECTION_1); // ok no exception thrown + + // now it should be watching it after the query + assertTrue(zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_1)); + + // add another collection + CollectionAdminRequest.createCollection(TEST_COLLECTION_2, "conf1", 1, 2).process(client); + cluster.waitForActiveCollection(TEST_COLLECTION_2, 1, 2); + new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, TEST_COLLECTION_2); + // watch both collections + assertTrue(zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_1)); + assertTrue(zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_2)); + + CollectionAdminRequest.deleteReplica(TEST_COLLECTION_1, "shard1", 1).process(client); + cluster.waitForActiveCollection(TEST_COLLECTION_1, 1, 1); + new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, TEST_COLLECTION_1); // ok no exception thrown + + // still one replica left, should not remove the watch + assertTrue(zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_1)); + + // now delete c1 and ensure it's cleared from various logic + CollectionAdminRequest.deleteCollection(TEST_COLLECTION_1).process(client); + zkStateReader.waitForState(TEST_COLLECTION_1, 30, TimeUnit.SECONDS, Objects::isNull); + assertNull(zkStateReader.getCollection(TEST_COLLECTION_1)); // check the cluster state + + // ensure querying throws exception + assertExceptionThrownWithMessageContaining( + SolrException.class, + ImmutableList.of("Collection not found"), + () -> + new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, TEST_COLLECTION_1)); + + // watch should be removed after c1 deletion + assertTrue(!zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_1)); + // still watching c2 + assertTrue(zkWatchAccessor.getWatchedCollections().contains(TEST_COLLECTION_2)); + } finally { + cluster.shutdown(); + } + } + + public void testSplitShard() throws Exception { + final int DATA_NODE_COUNT = 1; + MiniSolrCloudCluster cluster = + configureCluster(DATA_NODE_COUNT) + .addConfig("conf1", configset("cloud-minimal")) + .configure(); + + try { + + final String COLLECTION_NAME = "c1"; + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1, 1) + .setMaxShardsPerNode(10) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION_NAME, 1, 1); + + int DOC_PER_COLLECTION_COUNT = 1000; + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", COLLECTION_NAME + "-" + i); + ur.add(doc); + } + CloudSolrClient client = cluster.getSolrClient(); + ur.commit(client, COLLECTION_NAME); + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner coordinatorJetty; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + + QueryResponse response = (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, COLLECTION_NAME); + + assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound()); + + // now split the shard + CollectionAdminRequest.splitShard(COLLECTION_NAME).setShardName("shard1").process(client); + waitForState( + "Failed to wait for child shards after split", + COLLECTION_NAME, + (liveNodes, collectionState) -> + collectionState.getSlice("shard1_0") != null + && collectionState.getSlice("shard1_0").getState() == Slice.State.ACTIVE + && collectionState.getSlice("shard1_1") != null + && collectionState.getSlice("shard1_1").getState() == Slice.State.ACTIVE); + + // delete the parent shard + CollectionAdminRequest.deleteShard(COLLECTION_NAME, "shard1").process(client); + waitForState( + "Parent shard is not yet deleted after split", + COLLECTION_NAME, + (liveNodes, collectionState) -> collectionState.getSlice("shard1") == null); + + response = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, COLLECTION_NAME); + + assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound()); + } finally { + cluster.shutdown(); + } + } + + public void testMoveReplica() throws Exception { + final int DATA_NODE_COUNT = 2; + MiniSolrCloudCluster cluster = + configureCluster(DATA_NODE_COUNT) + .addConfig("conf1", configset("cloud-minimal")) + .configure(); + + List dataNodes = + cluster.getJettySolrRunners().stream() + .map(JettySolrRunner::getNodeName) + .collect(Collectors.toList()); + try { + + final String COLLECTION_NAME = "c1"; + String fromNode = dataNodes.get(0); // put the shard on first data node + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1, 1) + .setCreateNodeSet(fromNode) + .process(cluster.getSolrClient()); + // ensure replica is placed on the expected node + waitForState( + "Cannot find replica on first node yet", + COLLECTION_NAME, + (liveNodes, collectionState) -> { + if (collectionState.getReplicas().size() == 1) { + Replica replica = collectionState.getReplicas().get(0); + return fromNode.equals(replica.getNodeName()) + && replica.getState() == Replica.State.ACTIVE; + } + return false; + }); + + int DOC_PER_COLLECTION_COUNT = 1000; + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", COLLECTION_NAME + "-" + i); + ur.add(doc); + } + CloudSolrClient client = cluster.getSolrClient(); + ur.commit(client, COLLECTION_NAME); + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner coordinatorJetty; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + + QueryResponse response = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, COLLECTION_NAME); + + assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound()); + + // now move the shard/replica + String replicaName = getCollectionState(COLLECTION_NAME).getReplicas().get(0).getName(); + String toNodeName = dataNodes.get(1); + CollectionAdminRequest.moveReplica(COLLECTION_NAME, replicaName, toNodeName).process(client); + waitForState( + "Cannot find replica on second node yet after repliac move", + COLLECTION_NAME, + (liveNodes, collectionState) -> { + if (collectionState.getReplicas().size() == 1) { + Replica replica = collectionState.getReplicas().get(0); + return toNodeName.equals(replica.getNodeName()) + && replica.getState() == Replica.State.ACTIVE; + } + return false; + }); + + // We must stop the first node to ensure that query directs to the correct node from + // coordinator. + // In case if coordinator node has the wrong info (replica on first node), it might still + // return valid result if + // we do not stop the first node as first node might forward the query to second node. + cluster.getJettySolrRunners().get(0).stop(); + + response = + (QueryResponse) new QueryRequest(new SolrQuery("*:*")) + .setPreferredNodes(ImmutableList.of(coordinatorJetty.getNodeName())) + .process(client, COLLECTION_NAME); + + assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound()); + } finally { + cluster.shutdown(); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java index 51e92dbbaa84..edfd3d26af10 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -66,7 +67,8 @@ public enum METHOD { private METHOD method = METHOD.GET; private String path = null; - private Map headers; + private Map headers; + private List preferredNodes; private ResponseParser responseParser; private StreamingResponseCallback callback; @@ -92,6 +94,15 @@ public SolrRequest setUseBinaryV2(boolean flag){ return this; } + public SolrRequest setPreferredNodes(List nodes) { + this.preferredNodes = nodes; + return this; + } + + public List getPreferredNodes() { + return this.preferredNodes; + } + private String basicAuthUser, basicAuthPwd; private String basePath; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java index a224a6098a1c..9b6de9a98359 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java @@ -1112,6 +1112,21 @@ protected NamedList sendRequest(@SuppressWarnings({"rawtypes"})SolrReque throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No collection param specified on request and no default collection has been set: " + inputCollections); } + List preferredNodes = request.getPreferredNodes(); + if (preferredNodes != null && !preferredNodes.isEmpty()) { + String joinedInputCollections = StrUtils.join(inputCollections, ','); + List urlList = new ArrayList<>(preferredNodes.size()); + for (String nodeName : preferredNodes) { + urlList.add( + Utils.getBaseUrlForNodeName(nodeName, urlScheme) + "/" + joinedInputCollections); + } + if (!urlList.isEmpty()) { + LBSolrClient.Req req = new LBSolrClient.Req(request, urlList); + LBSolrClient.Rsp rsp = getLbClient().request(req); + return rsp.getResponse(); + } + } + // TODO: not a big deal because of the caching, but we could avoid looking // at every shard when getting leaders if we tweaked some things diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 10d3a925b67b..adb705ab324f 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -110,6 +110,12 @@ public class ZkStateReader implements SolrCloseable { public static final String STATE_TIMESTAMP_PROP = "stateTimestamp"; public static final String COLLECTIONS_ZKNODE = "/collections"; public static final String LIVE_NODES_ZKNODE = "/live_nodes"; + /** + * The following, node_roles and roles.json are for assigning roles to nodes. The node_roles is + * the preferred way (using -Dsolr.node.roles param), and roles.json is used by legacy ADDROLE API + * command. + */ + public static final String NODE_ROLES = "/node_roles"; public static final String ALIASES = "/aliases.json"; public static final String CLUSTER_STATE = "/clusterstate.json"; public static final String CLUSTER_PROPS = "/clusterprops.json"; @@ -2077,6 +2083,10 @@ private boolean updateWatchedCollection(String coll, DocCollection newState) { return updated; } + /* package-private for testing*/ + Map> getCollectionWatches() { + return Collections.unmodifiableMap(collectionWatches); + } public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index e229061628f8..6e4723561c88 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -146,6 +146,7 @@ import org.apache.solr.util.TestHarness; import org.apache.solr.util.TestInjection; import org.apache.zookeeper.KeeperException; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -168,6 +169,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.URL_SCHEME; import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.hamcrest.core.StringContains.containsString; /** * A junit4 Solr test harness that extends SolrTestCase and, by extension, LuceneTestCase. @@ -225,6 +227,18 @@ public static void writeCoreProperties(Path coreDirectory, Properties properties properties.store(writer, testname); } } + protected void assertExceptionThrownWithMessageContaining( + Class expectedType, + List expectedStrings, + ThrowingRunnable runnable) { + Throwable thrown = expectThrows(expectedType, runnable); + + if (expectedStrings != null) { + for (String expectedString : expectedStrings) { + MatcherAssert.assertThat(thrown.getMessage(), containsString(expectedString)); + } + } + } /** * Annotation for test classes that want to disable SSL