Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

/**
* This class contains methods dealing with Config Sets and called for Config Set API execution,
* called from the {@link OverseerConfigSetMessageHandler} or from {@link
* called from {@link
* org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner#runConfigSetCommand}
* depending on whether Collection and Config Set APIs are Overseer based or distributed.
*/
Expand Down
5 changes: 4 additions & 1 deletion solr/core/src/java/org/apache/solr/cloud/LockTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.OverseerMessageHandler.Lock;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.LockLevel;
import org.apache.solr.common.util.StrUtils;
Expand All @@ -38,6 +37,10 @@ public class LockTree {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Node root = new Node(null, LockLevel.CLUSTER, null);

public interface Lock {
void unlock();
}

private class LockImpl implements Lock {
final Node node;

Expand Down
81 changes: 17 additions & 64 deletions solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@
* <ul>
* <li>Cluster State updates, i.e. updating Collections' <code>state.json</code> files in
* ZooKeeper, see {@link ClusterStateUpdater},
* <li>Collection API implementation, see {@link OverseerCollectionConfigSetProcessor} and {@link
* <li>Collection API implementation, see {@link OverseerTaskProcessor} and {@link
* OverseerCollectionMessageHandler} (and the example below),
* <li>Updating Config Sets, see {@link OverseerCollectionConfigSetProcessor} and {@link
* OverseerConfigSetMessageHandler},
* </ul>
*
* <p>The nodes in the cluster communicate with the Overseer over queues implemented in ZooKeeper.
Expand All @@ -108,8 +106,8 @@
* <li>The <b>state update queue</b>, through which nodes request the Overseer to update the
* <code>state.json</code> file of a Collection in ZooKeeper. This queue is in Zookeeper at
* <code>/overseer/queue</code>,
* <li>A queue shared between <b>Collection API and Config Set API</b> requests. This queue is in
* Zookeeper at <code>/overseer/collection-queue-work</code>.
* <li>A queue for <b>Collection API</b> requests. This queue is in Zookeeper at <code>
* /overseer/collection-queue-work</code>.
* </ol>
*
* <p>An example of the steps involved in the Overseer processing a Collection creation API call:
Expand All @@ -119,8 +117,8 @@
* cluster,
* <li>The node (via {@link CollectionsHandler}) enqueues the request into the <code>
* /overseer/collection-queue-work</code> queue in ZooKeepeer,
* <li>The {@link OverseerCollectionConfigSetProcessor} running on the Overseer node dequeues the
* message and using an executor service with a maximum pool size of {@link
* <li>The {@link OverseerTaskProcessor} running on the Overseer node dequeues the message and
* using an executor service with a maximum pool size of {@link
* OverseerTaskProcessor#MAX_PARALLEL_TASKS} hands it for processing to {@link
* OverseerCollectionMessageHandler},
* <li>Command {@link CreateCollectionCmd} then executes and does:
Expand Down Expand Up @@ -488,11 +486,10 @@ private void checkIfIamStillLeader() {
try {
Map<?, ?> m = (Map<?, ?>) Utils.fromJSON(data);
String id = (String) m.get(ID);
if (overseerCollectionConfigSetProcessor.getId().equals(id)) {
if (overseerTaskProcessor.getId().equals(id)) {
try {
log.warn(
"I (id={}) am exiting, but I'm still the leader",
overseerCollectionConfigSetProcessor.getId());
"I (id={}) am exiting, but I'm still the leader", overseerTaskProcessor.getId());
zkClient.delete(path, stat.getVersion(), true);
} catch (KeeperException.BadVersionException e) {
// no problem ignore it some other Overseer has already taken over
Expand Down Expand Up @@ -592,7 +589,7 @@ private List<ZkWriteCommand> processMessage(
if (log.isInfoEnabled()) {
log.info("Quit command received {} {}", message, LeaderElector.getNodeName(myId));
}
overseerCollectionConfigSetProcessor.close();
overseerTaskProcessor.close();
close();
} else {
log.warn("Overseer received wrong QUIT message {}", message);
Expand Down Expand Up @@ -695,7 +692,7 @@ public boolean isClosed() {

private final String adminPath;

private OverseerCollectionConfigSetProcessor overseerCollectionConfigSetProcessor;
private OverseerTaskProcessor overseerTaskProcessor;

private ZkController zkController;

Expand Down Expand Up @@ -766,21 +763,21 @@ public synchronized void start(String id) {
// as we have an Overseer, we need to support this.
OverseerNodePrioritizer overseerPrioritizer =
new OverseerNodePrioritizer(reader, this, adminPath, shardHandler.getShardHandlerFactory());
overseerCollectionConfigSetProcessor =
new OverseerCollectionConfigSetProcessor(
overseerTaskProcessor =
new OverseerTaskProcessor(
reader,
id,
shardHandler,
shardHandler.getShardHandlerFactory(),
adminPath,
stats,
Overseer.this,
overseerPrioritizer,
this.getCollectionQueue(reader.getZkClient(), stats),
Overseer.getRunningMap(reader.getZkClient()),
Overseer.getCompletedMap(reader.getZkClient()),
Overseer.getFailureMap(reader.getZkClient()),
solrMetricsContext);
ccThread =
new OverseerThread(
ccTg,
overseerCollectionConfigSetProcessor,
"OverseerCollectionConfigSetProcessor-" + id);
ccThread = new OverseerThread(ccTg, overseerTaskProcessor, "OverseerTaskProcessor-" + id);
ccThread.setDaemon(true);

updaterThread.start();
Expand Down Expand Up @@ -1137,50 +1134,6 @@ OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)
return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}

/**
* Get queue that can be used to submit configset API tasks to the Overseer.
*
* <p>This queue is used internally by the {@link org.apache.solr.handler.admin.ConfigSetsHandler}
* to submit tasks which are executed by the {@link OverseerConfigSetMessageHandler}. The actions
* supported by this queue are listed in the {@link
* org.apache.solr.common.params.ConfigSetParams.ConfigSetAction} enum.
*
* <p>Performance statistics on the returned queue are <em>not</em> tracked by the Overseer Stats
* API, see {@link
* org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
}

/**
* Get queue that can be used to read configset API tasks to the Overseer.
*
* <p>This queue is used internally by the {@link OverseerConfigSetMessageHandler} to read
* configset API tasks submitted by the {@link org.apache.solr.handler.admin.ConfigSetsHandler}.
* The actions supported by this queue are listed in the {@link
* org.apache.solr.common.params.ConfigSetParams.ConfigSetAction} enum.
*
* <p>Performance statistics on the returned queue are tracked by the Overseer Stats API, see
* {@link org.apache.solr.common.params.CollectionParams.CollectionAction#OVERSEERSTATUS}.
*
* <p>For now, this internally returns the same queue as {@link #getCollectionQueue(SolrZkClient,
* Stats)}. It is the responsibility of the client to ensure that configset API actions are
* prefixed with {@link OverseerConfigSetMessageHandler#CONFIGSETS_ACTION_PREFIX} so that it is
* processed by {@link OverseerConfigSetMessageHandler}.
*
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure
// that the actions are prefixed with a unique string.
return getCollectionQueue(zkClient, zkStats);
}

private void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
Expand Down

This file was deleted.

Loading