From fbd659252378c1a4256510b1b928a4e3089ed108 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Thu, 14 Apr 2016 16:21:35 +0100 Subject: [PATCH 01/15] SOLR-8323 --- .../org/apache/solr/cloud/ZkController.java | 21 +- .../client/solrj/impl/CloudSolrClient.java | 36 +++ .../cloud/CollectionStatePredicate.java | 41 ++++ .../common/cloud/CollectionStateWatcher.java | 42 ++++ .../solr/common/cloud/DocCollection.java | 37 ++- .../org/apache/solr/common/cloud/Replica.java | 10 +- .../org/apache/solr/common/cloud/Slice.java | 16 +- .../solr/common/cloud/ZkStateReader.java | 216 ++++++++++++++-- .../cloud/TestCollectionStateWatchers.java | 231 ++++++++++++++++++ .../solr/cloud/MiniSolrCloudCluster.java | 36 ++- 10 files changed, 636 insertions(+), 50 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java create mode 100644 solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index bdd6a62a9248..83392e4e1fbe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1218,23 +1218,10 @@ public void unregister(String coreName, CoreDescriptor cd) throws InterruptedExc if (context != null) { context.cancelElection(); } - - final Collection cores = cc.getCores(); - - // if there is no SolrCore which is a member of this collection, remove the watch + CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); - boolean removeWatch = true; - for (SolrCore solrCore : cores) { - final CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor(); - if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) { - removeWatch = false; - break; - } - } - - if (removeWatch) { - zkStateReader.removeZKWatch(collection); - } + zkStateReader.unregisterCore(cloudDescriptor.getCollectionName()); + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, getNodeName(), @@ -1484,7 +1471,7 @@ public void preRegister(CoreDescriptor cd) { "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" : "Registering watch for collection {}", collectionName); - zkStateReader.addCollectionWatch(collectionName); + zkStateReader.registerCore(collectionName); } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index 52b912c0835c..e17921596bbb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -56,6 +56,8 @@ import org.apache.solr.common.ToleratedUpdateError; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -572,6 +574,40 @@ public void downloadConfig(String configName, Path downloadPath) throws IOExcept zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath); } + /** + * Block until a collection state matches a predicate, or a timeout + * + * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate a {@link CollectionStatePredicate} to check the collection state + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) + throws InterruptedException, TimeoutException { + connect(); + zkStateReader.waitForState(collection, wait, unit, predicate); + } + + /** + * Register a CollectionStateWatcher to be called when the cluster state for a collection changes + * + * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, + * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * call + * + * @param collection the collection to watch + * @param watcher a watcher that will be called when the state changes + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + connect(); + zkStateReader.registerCollectionStateWatcher(collection, watcher); + } + private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java new file mode 100644 index 000000000000..3a995123a6e6 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -0,0 +1,41 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Interface to determine if a collection state matches a required state + * + * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate) + */ +public interface CollectionStatePredicate { + + /** + * Check the collection state matches a required state + * + * The collectionState parameter may be null if the collection does not exist + * or has been deleted + * + * Note that both liveNodes and collectionState should be consulted to determine + * the overall state. + */ + boolean matches(Set liveNodes, DocCollection collectionState); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java new file mode 100644 index 000000000000..0bf66b012e8b --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java @@ -0,0 +1,42 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +import java.util.Set; + +/** + * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} + * and called whenever the collection state changes. + */ +public interface CollectionStateWatcher { + + /** + * Called when the collection we are registered against has a change of state + * + * Note that, due to the way Zookeeper watchers are implemented, a single call may be + * the result of several state changes + * + * A watcher is unregistered after it has been called once. To make a watcher persistent, + * implementors should re-register during this call. + * + * @param liveNodes the set of live nodes + * @param collectionState the new collection state + */ + void onStateChanged(Set liveNodes, DocCollection collectionState); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index e8f26e1a4ccc..138255c589b2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -22,6 +22,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -35,7 +37,8 @@ /** * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection") */ -public class DocCollection extends ZkNodeProps { +public class DocCollection extends ZkNodeProps implements Iterable { + public static final String DOC_ROUTER = "router"; public static final String SHARDS = "shards"; public static final String STATE_FORMAT = "stateFormat"; @@ -209,4 +212,36 @@ public Replica getReplica(String coreNodeName) { } return null; } + + /** + * Check that all replicas in a collection are live + * + * @see CollectionStatePredicate + */ + public static boolean isFullyActive(Set liveNodes, DocCollection collectionState) { + Objects.requireNonNull(liveNodes); + if (collectionState == null) + return false; + for (Slice slice : collectionState) { + for (Replica replica : slice) { + if (replica.isActive(liveNodes) == false) + return false; + } + } + return true; + } + + /** + * Returns true if the passed in DocCollection is null + * + * @see CollectionStatePredicate + */ + public static boolean isDeleted(Set liveNodes, DocCollection collectionState) { + return collectionState == null; + } + + @Override + public Iterator iterator() { + return slices.values().iterator(); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 3a31d195658a..7015dfbfdd57 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -16,13 +16,15 @@ */ package org.apache.solr.common.cloud; -import static org.apache.solr.common.cloud.ZkStateReader.*; - import java.util.Locale; import java.util.Map; +import java.util.Set; import org.noggit.JSONUtil; +import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; + public class Replica extends ZkNodeProps { /** @@ -116,6 +118,10 @@ public State getState() { return state; } + public boolean isActive(Set liveNodes) { + return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE; + } + @Override public String toString() { return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java index 369edbb102c4..ca52ba973b03 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java @@ -16,20 +16,26 @@ */ package org.apache.solr.common.cloud; -import org.noggit.JSONUtil; -import org.noggit.JSONWriter; - import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; +import org.noggit.JSONUtil; +import org.noggit.JSONWriter; + /** * A Slice contains immutable information about a logical shard (all replicas that share the same shard id). */ -public class Slice extends ZkNodeProps { - +public class Slice extends ZkNodeProps implements Iterable { + + @Override + public Iterator iterator() { + return replicas.values().iterator(); + } + /** The slice's state. */ public enum State { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index c6f88c06b4be..563dc7172f9b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -31,7 +31,10 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.common.Callable; @@ -114,6 +117,7 @@ public class ZkStateReader implements Closeable { /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */ private Map legacyCollectionStates = emptyMap(); + /** Last seen ZK version of clusterstate.json. */ private int legacyClusterStateVersion = 0; @@ -131,6 +135,19 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; + private Map collectionWatches = new ConcurrentHashMap<>(); + + private class CollectionWatch { + + int coreRefCount = 0; + Set stateWatchers = new HashSet<>(); + + public boolean canBeRemoved() { + return coreRefCount + stateWatchers.size() == 0; + } + + } + public static final Set KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList( LEGACY_CLOUD, URL_SCHEME, @@ -487,6 +504,12 @@ private void refreshLegacyClusterState(Watcher watcher) } this.legacyCollectionStates = loadedData.getCollectionStates(); this.legacyClusterStateVersion = stat.getVersion(); + for (Map.Entry entry : this.legacyCollectionStates.entrySet()) { + if (entry.getValue().isLazilyLoaded() == false) { + // a watched collection - trigger notifications + notifyStateWatchers(entry.getKey(), entry.getValue().get()); + } + } } } catch (KeeperException.NoNodeException e) { // Ignore missing legacy clusterstate.json. @@ -1068,19 +1091,190 @@ public static String getCollectionPath(String coll) { return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; } - public void addCollectionWatch(String coll) { - if (interestingCollections.add(coll)) { - LOG.info("addZkWatch [{}]", coll); - new StateWatcher(coll).refreshAndWatch(false); + /** + * Notify this reader that a local Core is a member of a collection, and so that collection + * state should be watched. + * + * Not a public API. This method should only be called from ZkController. + * + * The number of cores per-collection is tracked, and adding multiple cores from the same + * collection does not increase the number of watches. + * + * @param collection the collection that the core is a member of + * + * @see ZkStateReader#unregisterCore(String) + */ + public void registerCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + interestingCollections.add(collection); + if (v == null) { + reconstructState.set(true); + v = new CollectionWatch(); + } + v.coreRefCount++; + return v; + }); + if (reconstructState.get()) { + new StateWatcher(collection).refreshAndWatch(false); + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Notify this reader that a local core that is a member of a collection has been closed. + * + * Not a public API. This method should only be called from ZkController. + * + * If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s + * for that collection either, the collection watch will be removed. + * + * @param collection the collection that the core belongs to + */ + public void unregisterCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + if (v.coreRefCount > 0) + v.coreRefCount--; + if (v.canBeRemoved()) { + interestingCollections.remove(collection); + watchedCollectionStates.remove(collection); + lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); + reconstructState.set(true); + return null; + } + return v; + }); + if (reconstructState.get()) { + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Register a CollectionStateWatcher to be called when the state of a collection changes + * + * A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher, + * it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * method. + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + AtomicBoolean watchSet = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) { + interestingCollections.add(collection); + v = new CollectionWatch(); + watchSet.set(true); + } + v.stateWatchers.add(stateWatcher); + return v; + }); + if (watchSet.get()) { + new StateWatcher(collection).refreshAndWatch(false); synchronized (getUpdateLock()) { constructState(); } } } + /** + * Block until a CollectionStatePredicate returns true, or the wait times out + * + * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate the predicate to call on state changes + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) + throws InterruptedException, TimeoutException { + + final CountDownLatch latch = new CountDownLatch(1); + + CollectionStateWatcher watcher = new CollectionStateWatcher() { + @Override + public void onStateChanged(Set liveNodes, DocCollection collectionState) { + if (predicate.matches(liveNodes, collectionState)) { + latch.countDown(); + } else { + registerCollectionStateWatcher(collection, this); + } + } + }; + registerCollectionStateWatcher(collection, watcher); + + try { + // check the current state + DocCollection dc = clusterState.getCollectionOrNull(collection); + if (predicate.matches(liveNodes, dc)) + return; + + // wait for the watcher predicate to return true, or time out + if (!latch.await(wait, unit)) + throw new TimeoutException(); + + } + finally { + removeCollectionStateWatcher(collection, watcher); + } + } + + /** + * Remove a watcher from a collection's watch list. + * + * This allows Zookeeper watches to be removed if there is no interest in the + * collection. + * + * @param collection the collection + * @param watcher the watcher + */ + public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + v.stateWatchers.remove(watcher); + if (v.canBeRemoved()) + return null; + return v; + }); + } + + private void notifyStateWatchers(String collection, DocCollection collectionState) { + List watchers = new ArrayList<>(); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + watchers.addAll(v.stateWatchers); + v.stateWatchers.clear(); + return v; + }); + for (CollectionStateWatcher watcher : watchers) { + watcher.onStateChanged(liveNodes, collectionState); + } + } + + /* package-private for testing */ + Set getStateWatchers(String collection) { + CollectionWatch watch = collectionWatches.get(collection); + if (watch == null) + return null; + return new HashSet<>(watch.stateWatchers); + } + private void updateWatchedCollection(String coll, DocCollection newState) { + if (newState == null) { LOG.info("Deleting data for [{}]", coll); + notifyStateWatchers(coll, null); watchedCollectionStates.remove(coll); return; } @@ -1094,6 +1288,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); + notifyStateWatchers(coll, newState); break; } } else { @@ -1103,6 +1298,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); + notifyStateWatchers(coll, newState); break; } } @@ -1113,17 +1309,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { watchedCollectionStates.remove(coll); LOG.info("Removing uninteresting collection [{}]", coll); } - } - - /** This is not a public API. Only used by ZkController */ - public void removeZKWatch(String coll) { - LOG.info("Removing watch for uninteresting collection [{}]", coll); - interestingCollections.remove(coll); - watchedCollectionStates.remove(coll); - lazyCollectionStates.put(coll, new LazyCollectionRef(coll)); - synchronized (getUpdateLock()) { - constructState(); - } + } public static class ConfigData { diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java new file mode 100644 index 000000000000..842216c03614 --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -0,0 +1,231 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; + +public class TestCollectionStateWatchers extends SolrCloudTestCase { + + private static final int CLUSTER_SIZE = 4; + + private static final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool( + new SolrjNamedThreadFactory("backgroundWatchers") + ); + + private static final int MAX_WAIT_TIMEOUT = 30; + + @BeforeClass + public static void startCluster() throws Exception { + configureCluster(CLUSTER_SIZE) + .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) + .configure(); + } + + @AfterClass + public static void shutdownBackgroundExecutors() { + executor.shutdown(); + } + + @Before + public void prepareCluster() throws Exception { + int missingServers = CLUSTER_SIZE - cluster.getJettySolrRunners().size(); + for (int i = 0; i < missingServers; i++) { + cluster.startJettySolrRunner(); + } + cluster.waitForAllNodes(30); + } + + private static Future waitInBackground(String collection, long timeout, TimeUnit unit, + CollectionStatePredicate predicate) { + return executor.submit(() -> { + try { + cluster.getSolrClient().waitForState(collection, timeout, unit, predicate); + } catch (InterruptedException | TimeoutException e) { + return Boolean.FALSE; + } + return Boolean.TRUE; + }); + } + + + @Test + public void testSimpleCollectionWatch() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("testcollection", CLUSTER_SIZE, 1, "config", new HashMap<>()); + + client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + // shutdown a node and check that we get notified about the change + final AtomicInteger nodeCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { + // we can't just count liveNodes here, because that's updated by a separate watcher, + // and it may be the case that we're triggered by a node setting itself to DOWN before + // the liveNodes watcher is called + for (Slice slice : collectionState) { + for (Replica replica : slice) { + if (replica.isActive(liveNodes)) + nodeCount.incrementAndGet(); + } + } + latch.countDown(); + }); + + cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + + assertThat(nodeCount.intValue(), is(3)); + + } + + @Test + public void testWaitForStateChecksCurrentState() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("waitforstate", 1, 1, "config", new HashMap<>()); + + client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + // several goes, to check that we're not getting delayed state changes + for (int i = 0; i < 10; i++) { + try { + client.waitForState("waitforstate", 1, TimeUnit.SECONDS, DocCollection::isFullyActive); + } + catch (TimeoutException e) { + fail("waitForState should return immediately if the predicate is already satisfied"); + } + } + + } + + @Test + public void testCanWatchForNonexistantCollection() throws Exception { + + Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + cluster.createCollection("delayed", 1, 1, "config", new HashMap<>()); + assertTrue("waitForState was not triggered by collection creation", future.get()); + + } + + @Test + public void testPredicateFailureTimesOut() throws Exception { + CloudSolrClient client = cluster.getSolrClient(); + expectThrows(TimeoutException.class, () -> { + client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); + }); + } + + @Test + public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("falsepredicate", 4, 1, "config", new HashMap<>()); + client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + final CountDownLatch firstCall = new CountDownLatch(1); + + // stop a node, then add a watch waiting for all nodes to be back up + JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + + Future future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> { + firstCall.countDown(); + return DocCollection.isFullyActive(liveNodes, collectionState); + }); + + // first, stop another node; the watch should not be fired after this! + JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + + // now start them both back up + cluster.startJettySolrRunner(node1); + assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + cluster.startJettySolrRunner(node2); + + Boolean result = future.get(); + assertTrue("Did not see a fully active cluster after 30 seconds", result); + + } + + @Test + public void testWatcherIsRemovedAfterTimeout() { + CloudSolrClient client = cluster.getSolrClient(); + assertTrue("There should be no watchers for a non-existent collection!", + client.getZkStateReader().getStateWatchers("no-such-collection") == null); + + expectThrows(TimeoutException.class, () -> { + client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, DocCollection::isFullyActive); + }); + + Set watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); + assertTrue("Watchers for collection should be removed after timeout", + watchers == null || watchers.size() == 0); + + } + + @Test + public void testDeletionsTriggerWatches() throws Exception { + cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>()); + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isDeleted); + + CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); + + assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get()); + } + + @Test + public void testWatchesWorkForStateFormat1() throws Exception { + + final CloudSolrClient client = cluster.getSolrClient(); + + Future future + = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get()); + + Future migrated + = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> c != null && c.getStateFormat() == 2); + + CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get()); + + } + +} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index d23b37cd4b87..f51116246eb5 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.SortedMap; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; @@ -181,8 +182,8 @@ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyC */ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyConfig jettyConfig, ZkTestServer zkTestServer) throws Exception { - this.baseDir = baseDir; - this.jettyConfig = jettyConfig; + this.baseDir = Objects.requireNonNull(baseDir); + this.jettyConfig = Objects.requireNonNull(jettyConfig); Files.createDirectories(baseDir); @@ -194,8 +195,7 @@ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyC } this.zkServer = zkTestServer; - try(SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), - AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT, null)) { + try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) { zkClient.makePath("/solr/solr.xml", solrXml.getBytes(Charset.defaultCharset()), true); if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) { zkClient.makePath("/solr" + ZkStateReader.CLUSTER_PROPS, "{'urlScheme':'https'}".getBytes(Charsets.UTF_8), true); @@ -222,12 +222,17 @@ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyC throw startupError; } - try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), - AbstractZkTestCase.TIMEOUT, 45000, null)) { + waitForAllNodes(numServers, 60); + + solrClient = buildSolrClient(); + } + + private void waitForAllNodes(int numServers, int timeout) throws IOException, InterruptedException { + try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) { int numliveNodes = 0; - int retries = 60; + int retries = timeout; String liveNodesPath = "/solr/live_nodes"; - // Wait up to 60 seconds for number of live_nodes to match up number of servers + // Wait up to {timeout} seconds for number of live_nodes to match up number of servers do { if (zkClient.exists(liveNodesPath, true)) { numliveNodes = zkClient.getChildren(liveNodesPath, null, true).size(); @@ -244,8 +249,13 @@ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyC Thread.sleep(1000); } while (numliveNodes != numServers); } + catch (KeeperException e) { + throw new IOException("Error communicating with zookeeper", e); + } + } - solrClient = buildSolrClient(); + public void waitForAllNodes(int timeout) throws IOException, InterruptedException { + waitForAllNodes(jettys.size(), timeout); } private String newNodeName() { @@ -348,7 +358,13 @@ public JettySolrRunner stopJettySolrRunner(int index) throws Exception { return jetty; } - protected JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception { + /** + * Add a previously stopped node back to the cluster + * @param jetty a {@link JettySolrRunner} previously returned by {@link #stopJettySolrRunner(int)} + * @return the started node + * @throws Exception on error + */ + public JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception { jetty.start(); jettys.add(jetty); return jetty; From fd290d1f0262f3c844d5006f0fa64936f6b9b1d9 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 20 Apr 2016 18:58:55 +0100 Subject: [PATCH 02/15] Remove unneeded interestingcollections var --- .../solr/common/cloud/ZkStateReader.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 563dc7172f9b..66562c6eb3ed 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -111,10 +111,7 @@ public class ZkStateReader implements Closeable { public static final String SHARD_LEADERS_ZKNODE = "leaders"; public static final String ELECTION_NODE = "election"; - - /** Collections we actively care about, and will try to keep watch on. */ - private final Set interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<>()); - + /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */ private Map legacyCollectionStates = emptyMap(); @@ -473,7 +470,7 @@ private void constructState() { this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion); LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet().size(), - interestingCollections.size(), + collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.getCollectionStates().size()); @@ -481,7 +478,7 @@ private void constructState() { if (LOG.isTraceEnabled()) { LOG.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet(), - interestingCollections, + collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(), clusterState.getCollectionStates()); @@ -525,7 +522,7 @@ private void refreshLegacyClusterState(Watcher watcher) */ private void refreshStateFormat2Collections() { // It's okay if no format2 state.json exists, if one did not previous exist. - for (String coll : interestingCollections) { + for (String coll : collectionWatches.keySet()) { new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll)); } } @@ -566,7 +563,7 @@ private void refreshCollectionList(Watcher watcher) throws KeeperException, Inte this.lazyCollectionStates.keySet().retainAll(children); for (String coll : children) { // We will create an eager collection for any interesting collections, so don't add to lazy. - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { // Double check contains just to avoid allocating an object. LazyCollectionRef existing = lazyCollectionStates.get(coll); if (existing == null) { @@ -908,7 +905,7 @@ public void process(WatchedEvent event) { return; } - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { // This collection is no longer interesting, stop watching. LOG.info("Uninteresting collection {}", coll); return; @@ -1107,7 +1104,6 @@ public static String getCollectionPath(String coll) { public void registerCore(String collection) { AtomicBoolean reconstructState = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { - interestingCollections.add(collection); if (v == null) { reconstructState.set(true); v = new CollectionWatch(); @@ -1141,7 +1137,6 @@ public void unregisterCore(String collection) { if (v.coreRefCount > 0) v.coreRefCount--; if (v.canBeRemoved()) { - interestingCollections.remove(collection); watchedCollectionStates.remove(collection); lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); reconstructState.set(true); @@ -1167,7 +1162,6 @@ public void registerCollectionStateWatcher(String collection, CollectionStateWat AtomicBoolean watchSet = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) { - interestingCollections.add(collection); v = new CollectionWatch(); watchSet.set(true); } @@ -1281,7 +1275,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { // CAS update loop while (true) { - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { break; } DocCollection oldState = watchedCollectionStates.get(coll); @@ -1305,7 +1299,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { } // Resolve race with removeZKWatch. - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { watchedCollectionStates.remove(coll); LOG.info("Removing uninteresting collection [{}]", coll); } From 9d842d7ffc97293c2a573394c2b81884ea9b755e Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 20 Apr 2016 21:38:04 +0100 Subject: [PATCH 03/15] Fix test compilation in core-tests; nuke unused param on refreshAndWatch --- .../solr/cloud/overseer/ZkStateReaderTest.java | 8 ++++---- .../apache/solr/common/cloud/ZkStateReader.java | 17 +++++------------ 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index fd8d49391768..2964d7b01d2a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -74,7 +74,7 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting ZkStateReader reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); if (isInteresting) { - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); } ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); @@ -170,9 +170,9 @@ public void testExternalCollectionWatchedNotWatched() throws Exception{ reader.forceUpdateCollection("c1"); assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.removeZKWatch("c1"); + reader.unregisterCore("c1"); assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); } finally { @@ -198,7 +198,7 @@ public void testWatchedCollectionCreation() throws Exception { ZkStateReader reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); // Initially there should be no c1 collection. assertNull(reader.getClusterState().getCollectionRef("c1")); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 66562c6eb3ed..07f3f1f23459 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -521,9 +521,8 @@ private void refreshLegacyClusterState(Watcher watcher) * Refresh state format2 collections. */ private void refreshStateFormat2Collections() { - // It's okay if no format2 state.json exists, if one did not previous exist. for (String coll : collectionWatches.keySet()) { - new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll)); + new StateWatcher(coll).refreshAndWatch(); } } @@ -916,7 +915,7 @@ public void process(WatchedEvent event) { LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", event, coll, liveNodesSize); - refreshAndWatch(true); + refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } @@ -926,17 +925,11 @@ public void process(WatchedEvent event) { * Refresh collection state from ZK and leave a watch for future changes. * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates} * with the results of the refresh. - * - * @param expectExists if true, error if no state node exists */ - public void refreshAndWatch(boolean expectExists) { + public void refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); updateWatchedCollection(coll, newState); - } catch (KeeperException.NoNodeException e) { - if (expectExists) { - LOG.warn("State node vanished for collection: [{}]", coll, e); - } } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -1112,7 +1105,7 @@ public void registerCore(String collection) { return v; }); if (reconstructState.get()) { - new StateWatcher(collection).refreshAndWatch(false); + new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } @@ -1169,7 +1162,7 @@ public void registerCollectionStateWatcher(String collection, CollectionStateWat return v; }); if (watchSet.get()) { - new StateWatcher(collection).refreshAndWatch(false); + new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } From d367db7ca02cd8bf77da8794dcdc8e5d3fe4349b Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Fri, 29 Apr 2016 14:31:24 +0100 Subject: [PATCH 04/15] Incorporate feedback - CollectionStateWatcher is now an internal class on ZkStateReader, and registerCSW() has been removed - Javadocs improved - Legacy state watcher checks that state has actually changed per-collection before calling notifyStateWatchers - notifyStateWatchers takes a liveNodes param, to avoid multiple volatile reads - isDeleted() method removed --- .../client/solrj/impl/CloudSolrClient.java | 16 ----- .../cloud/CollectionStatePredicate.java | 5 +- .../common/cloud/CollectionStateWatcher.java | 42 ------------ .../solr/common/cloud/DocCollection.java | 9 --- .../solr/common/cloud/ZkStateReader.java | 66 +++++++++++-------- .../cloud/TestCollectionStateWatchers.java | 33 ++++------ 6 files changed, 52 insertions(+), 119 deletions(-) delete mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index e17921596bbb..e8a967407a79 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -57,7 +57,6 @@ import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CollectionStatePredicate; -import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -593,21 +592,6 @@ public void waitForState(String collection, long wait, TimeUnit unit, Collection zkStateReader.waitForState(collection, wait, unit, predicate); } - /** - * Register a CollectionStateWatcher to be called when the cluster state for a collection changes - * - * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, - * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} - * call - * - * @param collection the collection to watch - * @param watcher a watcher that will be called when the state changes - */ - public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { - connect(); - zkStateReader.registerCollectionStateWatcher(collection, watcher); - } - private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java index 3a995123a6e6..653e90595070 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -30,8 +30,9 @@ public interface CollectionStatePredicate { /** * Check the collection state matches a required state * - * The collectionState parameter may be null if the collection does not exist - * or has been deleted + * @param liveNodes the current set of live nodes + * @param collectionState the latest collection state, or null if the collection + * does not exist * * Note that both liveNodes and collectionState should be consulted to determine * the overall state. diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java deleted file mode 100644 index 0bf66b012e8b..000000000000 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.solr.common.cloud; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -import java.util.Set; - -/** - * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} - * and called whenever the collection state changes. - */ -public interface CollectionStateWatcher { - - /** - * Called when the collection we are registered against has a change of state - * - * Note that, due to the way Zookeeper watchers are implemented, a single call may be - * the result of several state changes - * - * A watcher is unregistered after it has been called once. To make a watcher persistent, - * implementors should re-register during this call. - * - * @param liveNodes the set of live nodes - * @param collectionState the new collection state - */ - void onStateChanged(Set liveNodes, DocCollection collectionState); - -} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 843422bcc260..b40bb174a4b8 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -231,15 +231,6 @@ public static boolean isFullyActive(Set liveNodes, DocCollection collect return true; } - /** - * Returns true if the passed in DocCollection is null - * - * @see CollectionStatePredicate - */ - public static boolean isDeleted(Set liveNodes, DocCollection collectionState) { - return collectionState == null; - } - @Override public Iterator iterator() { return slices.values().iterator(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index b85162a44e31..05ae8a7b24d3 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -132,7 +133,7 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; - private Map collectionWatches = new ConcurrentHashMap<>(); + private final ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); private class CollectionWatch { @@ -256,9 +257,10 @@ public void updateClusterState() throws KeeperException, InterruptedException { refreshLegacyClusterState(null); // Need a copy so we don't delete from what we're iterating over. Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); + Set liveNodes = new HashSet<>(this.liveNodes); for (String coll : safeCopy) { DocCollection newState = fetchCollectionState(coll, null); - updateWatchedCollection(coll, newState); + updateWatchedCollection(liveNodes, coll, newState); } refreshCollectionList(null); refreshLiveNodes(null); @@ -300,7 +302,7 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int } else if (watchedCollectionStates.containsKey(collection)) { // Exists as a watched collection, force a refresh. DocCollection newState = fetchCollectionState(collection, null); - updateWatchedCollection(collection, newState); + updateWatchedCollection(liveNodes, collection, newState); } constructState(); } @@ -323,7 +325,7 @@ public Integer compareStateVersions(String coll, int version) { DocCollection nu = getCollectionLive(this, coll); if (nu == null) return -1 ; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { - updateWatchedCollection(coll, nu); + updateWatchedCollection(liveNodes, coll, nu); collection = nu; } } @@ -491,19 +493,28 @@ private void refreshLegacyClusterState(Watcher watcher) final Stat stat = new Stat(); final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE); + final Set liveNodes = new HashSet<>(this.liveNodes); synchronized (getUpdateLock()) { if (this.legacyClusterStateVersion >= stat.getVersion()) { // Nothing to do, someone else updated same or newer. return; } - this.legacyCollectionStates = loadedData.getCollectionStates(); - this.legacyClusterStateVersion = stat.getVersion(); - for (Map.Entry entry : this.legacyCollectionStates.entrySet()) { - if (entry.getValue().isLazilyLoaded() == false) { - // a watched collection - trigger notifications - notifyStateWatchers(entry.getKey(), entry.getValue().get()); + LOG.info("Updating legacy cluster state - {} entries in legacyCollectionStates", legacyCollectionStates.size()); + for (Map.Entry watchEntry : this.collectionWatches.entrySet()) { + String coll = watchEntry.getKey(); + CollectionWatch collWatch = watchEntry.getValue(); + ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); + if (ref == null) + continue; + // watched collection, so this will always be local + DocCollection newState = ref.get(); + if (!collWatch.stateWatchers.isEmpty() + && !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) { + notifyStateWatchers(liveNodes, coll, newState); } } + this.legacyCollectionStates = loadedData.getCollectionStates(); + this.legacyClusterStateVersion = stat.getVersion(); } } catch (KeeperException.NoNodeException e) { // Ignore missing legacy clusterstate.json. @@ -927,7 +938,7 @@ public void process(WatchedEvent event) { public void refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); - updateWatchedCollection(coll, newState); + updateWatchedCollection(liveNodes, coll, newState); } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -1142,14 +1153,7 @@ public void unregisterCore(String collection) { } } - /** - * Register a CollectionStateWatcher to be called when the state of a collection changes - * - * A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher, - * it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} - * method. - */ - public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + private void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) { @@ -1233,7 +1237,7 @@ public void removeCollectionStateWatcher(String collection, CollectionStateWatch }); } - private void notifyStateWatchers(String collection, DocCollection collectionState) { + private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { List watchers = new ArrayList<>(); collectionWatches.compute(collection, (k, v) -> { if (v == null) @@ -1248,18 +1252,18 @@ private void notifyStateWatchers(String collection, DocCollection collectionStat } /* package-private for testing */ - Set getStateWatchers(String collection) { + int getStateWatchCount(String collection) { CollectionWatch watch = collectionWatches.get(collection); if (watch == null) - return null; - return new HashSet<>(watch.stateWatchers); + return 0; + return watch.stateWatchers.size(); } - private void updateWatchedCollection(String coll, DocCollection newState) { + private void updateWatchedCollection(Set liveNodes, String coll, DocCollection newState) { if (newState == null) { LOG.info("Deleting data for [{}]", coll); - notifyStateWatchers(coll, null); + notifyStateWatchers(liveNodes, coll, null); watchedCollectionStates.remove(coll); return; } @@ -1273,7 +1277,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); - notifyStateWatchers(coll, newState); + notifyStateWatchers(liveNodes, coll, newState); break; } } else { @@ -1283,7 +1287,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); - notifyStateWatchers(coll, newState); + notifyStateWatchers(liveNodes, coll, newState); break; } } @@ -1292,7 +1296,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { // Resolve race with removeZKWatch. if (!collectionWatches.containsKey(coll)) { watchedCollectionStates.remove(coll); - LOG.info("Removing uninteresting collection [{}]", coll); + LOG.info("Unwatching collection [{}]", coll); } } @@ -1310,4 +1314,10 @@ public ConfigData(Map data, int version) { } } + + private interface CollectionStateWatcher { + + void onStateChanged(Set liveNodes, DocCollection collectionState); + + } } diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 842216c03614..91d0242dd00f 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -18,13 +18,11 @@ */ import java.util.HashMap; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -37,8 +35,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import static org.hamcrest.core.Is.is; - public class TestCollectionStateWatchers extends SolrCloudTestCase { private static final int CLUSTER_SIZE = 4; @@ -92,25 +88,19 @@ public void testSimpleCollectionWatch() throws Exception { client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); // shutdown a node and check that we get notified about the change - final AtomicInteger nodeCount = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(1); - client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { - // we can't just count liveNodes here, because that's updated by a separate watcher, - // and it may be the case that we're triggered by a node setting itself to DOWN before - // the liveNodes watcher is called - for (Slice slice : collectionState) { + Future future = waitInBackground("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> { + int nodecount = 0; + for (Slice slice : c) { for (Replica replica : slice) { - if (replica.isActive(liveNodes)) - nodeCount.incrementAndGet(); + if (replica.isActive(n)) + nodecount++; } } - latch.countDown(); + return nodecount == 3; }); - cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); - assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertTrue("CollectionStateWatcher was never notified of cluster change", future.get()); - assertThat(nodeCount.intValue(), is(3)); } @@ -185,22 +175,21 @@ public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Excepti public void testWatcherIsRemovedAfterTimeout() { CloudSolrClient client = cluster.getSolrClient(); assertTrue("There should be no watchers for a non-existent collection!", - client.getZkStateReader().getStateWatchers("no-such-collection") == null); + client.getZkStateReader().getStateWatchCount("no-such-collection") == 0); expectThrows(TimeoutException.class, () -> { client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, DocCollection::isFullyActive); }); - Set watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); - assertTrue("Watchers for collection should be removed after timeout", - watchers == null || watchers.size() == 0); + long count = client.getZkStateReader().getStateWatchCount("no-such-collection"); + assertTrue("Watchers for collection should be removed after timeout", count == 0); } @Test public void testDeletionsTriggerWatches() throws Exception { cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>()); - Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isDeleted); + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> c == null); CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); From 95bee1d4e44b3c1e358d70cd479ad55ff7e94372 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Sat, 30 Apr 2016 11:08:51 +0100 Subject: [PATCH 05/15] Move notify calls out of synchronized blocks --- .../cloud/CollectionStatePredicate.java | 5 +- .../solr/common/cloud/ZkStateReader.java | 80 +++++++++++++------ .../cloud/TestCollectionStateWatchers.java | 3 + 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java index 653e90595070..1112daa0143d 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -30,12 +30,13 @@ public interface CollectionStatePredicate { /** * Check the collection state matches a required state * + * Note that both liveNodes and collectionState should be consulted to determine + * the overall state. + * * @param liveNodes the current set of live nodes * @param collectionState the latest collection state, or null if the collection * does not exist * - * Note that both liveNodes and collectionState should be consulted to determine - * the overall state. */ boolean matches(Set liveNodes, DocCollection collectionState); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 05ae8a7b24d3..582402b20307 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -247,6 +247,8 @@ public ZkConfigManager getConfigManager() { */ @Deprecated public void updateClusterState() throws KeeperException, InterruptedException { + List collectionsToNotify = new ArrayList<>(); + Set liveNodes = this.liveNodes; // volatile read synchronized (getUpdateLock()) { if (clusterState == null) { // Never initialized, just run normal initialization. @@ -254,18 +256,22 @@ public void updateClusterState() throws KeeperException, InterruptedException { return; } // No need to set watchers because we should already have watchers registered for everything. - refreshLegacyClusterState(null); + collectionsToNotify.addAll(refreshLegacyClusterState(null, liveNodes)); // Need a copy so we don't delete from what we're iterating over. Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); - Set liveNodes = new HashSet<>(this.liveNodes); for (String coll : safeCopy) { DocCollection newState = fetchCollectionState(coll, null); - updateWatchedCollection(liveNodes, coll, newState); + if (updateWatchedCollection(liveNodes, coll, newState)) { + collectionsToNotify.add(coll); + } } refreshCollectionList(null); refreshLiveNodes(null); constructState(); } + for (String collection : collectionsToNotify) { + notifyStateWatchers(liveNodes, collection, watchedCollectionStates.get(collection)); + } } /** @@ -273,6 +279,9 @@ public void updateClusterState() throws KeeperException, InterruptedException { * a better design is possible. */ public void forceUpdateCollection(String collection) throws KeeperException, InterruptedException { + boolean changed = false; + List changedLegacyCollections = null; + Set liveNodes = this.liveNodes; // volatile read synchronized (getUpdateLock()) { if (clusterState == null) { return; @@ -282,7 +291,7 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int if (ref == null || legacyCollectionStates.containsKey(collection)) { // We either don't know anything about this collection (maybe it's new?) or it's legacy. // First update the legacy cluster state. - refreshLegacyClusterState(null); + changedLegacyCollections = refreshLegacyClusterState(null, liveNodes); if (!legacyCollectionStates.containsKey(collection)) { // No dice, see if a new collection just got created. LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection); @@ -298,14 +307,22 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int return; } // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there. - refreshLegacyClusterState(null); + changedLegacyCollections = refreshLegacyClusterState(null, liveNodes); } else if (watchedCollectionStates.containsKey(collection)) { // Exists as a watched collection, force a refresh. DocCollection newState = fetchCollectionState(collection, null); - updateWatchedCollection(liveNodes, collection, newState); + changed = updateWatchedCollection(liveNodes, collection, newState); } constructState(); } + if (changedLegacyCollections != null) { + for (String changedCollection : changedLegacyCollections) { + notifyStateWatchers(liveNodes, changedCollection, legacyCollectionStates.get(changedCollection).get()); + } + } + if (changed) { + notifyStateWatchers(liveNodes, collection, watchedCollectionStates.get(collection)); + } } /** Refresh the set of live nodes. */ @@ -352,10 +369,10 @@ public synchronized void createClusterStateWatchersAndUpdate() throws KeeperExce } // on reconnect of SolrZkClient force refresh and re-add watches. - refreshLegacyClusterState(new LegacyClusterStateWatcher()); + refreshLiveNodes(new LiveNodeWatcher()); + refreshLegacyClusterState(new LegacyClusterStateWatcher(), liveNodes); refreshStateFormat2Collections(); refreshCollectionList(new CollectionsChildWatcher()); - refreshLiveNodes(new LiveNodeWatcher()); synchronized (ZkStateReader.this.getUpdateLock()) { constructState(); @@ -486,20 +503,21 @@ private void constructState() { /** * Refresh legacy (shared) clusterstate.json + * + * Returns the collections that have changed */ - private void refreshLegacyClusterState(Watcher watcher) + private List refreshLegacyClusterState(Watcher watcher, Set liveNodes) throws KeeperException, InterruptedException { + List changedCollections = new ArrayList<>(); try { final Stat stat = new Stat(); final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE); - final Set liveNodes = new HashSet<>(this.liveNodes); synchronized (getUpdateLock()) { if (this.legacyClusterStateVersion >= stat.getVersion()) { // Nothing to do, someone else updated same or newer. - return; + return changedCollections; } - LOG.info("Updating legacy cluster state - {} entries in legacyCollectionStates", legacyCollectionStates.size()); for (Map.Entry watchEntry : this.collectionWatches.entrySet()) { String coll = watchEntry.getKey(); CollectionWatch collWatch = watchEntry.getValue(); @@ -510,7 +528,7 @@ private void refreshLegacyClusterState(Watcher watcher) DocCollection newState = ref.get(); if (!collWatch.stateWatchers.isEmpty() && !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) { - notifyStateWatchers(liveNodes, coll, newState); + changedCollections.add(coll); } } this.legacyCollectionStates = loadedData.getCollectionStates(); @@ -523,6 +541,7 @@ private void refreshLegacyClusterState(Watcher watcher) this.legacyClusterStateVersion = 0; } } + return changedCollections; } /** @@ -924,10 +943,13 @@ public void process(WatchedEvent event) { LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", event, coll, liveNodesSize); - refreshAndWatch(); + boolean changed = refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } + if (changed) { + notifyStateWatchers(liveNodes, coll, watchedCollectionStates.get(coll)); + } } /** @@ -935,10 +957,10 @@ public void process(WatchedEvent event) { * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates} * with the results of the refresh. */ - public void refreshAndWatch() { + public boolean refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); - updateWatchedCollection(liveNodes, coll, newState); + return updateWatchedCollection(liveNodes, coll, newState); } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -948,6 +970,7 @@ public void refreshAndWatch() { Thread.currentThread().interrupt(); LOG.error("Unwatched collection: [{}]", coll, e); } + return false; } } @@ -962,16 +985,20 @@ public void process(WatchedEvent event) { } int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size(); LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize); - refreshAndWatch(); + List changedCollections = refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } + Set liveNodes = ZkStateReader.this.liveNodes; // volatile read + for (String collection : changedCollections) { + notifyStateWatchers(liveNodes, collection, legacyCollectionStates.get(collection).get()); + } } /** Must hold {@link #getUpdateLock()} before calling this method. */ - public void refreshAndWatch() { + public List refreshAndWatch() { try { - refreshLegacyClusterState(this); + return refreshLegacyClusterState(this, liveNodes); } catch (KeeperException.NoNodeException e) { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready"); @@ -985,6 +1012,7 @@ public void refreshAndWatch() { Thread.currentThread().interrupt(); LOG.warn("Interrupted", e); } + return Collections.emptyList(); } } @@ -1259,15 +1287,17 @@ int getStateWatchCount(String collection) { return watch.stateWatchers.size(); } - private void updateWatchedCollection(Set liveNodes, String coll, DocCollection newState) { + // returns true if the state has changed + private boolean updateWatchedCollection(Set liveNodes, String coll, DocCollection newState) { if (newState == null) { LOG.info("Deleting data for [{}]", coll); - notifyStateWatchers(liveNodes, coll, null); watchedCollectionStates.remove(coll); - return; + return true; } + boolean changed = false; + // CAS update loop while (true) { if (!collectionWatches.containsKey(coll)) { @@ -1277,7 +1307,7 @@ private void updateWatchedCollection(Set liveNodes, String coll, DocColl if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); - notifyStateWatchers(liveNodes, coll, newState); + changed = true; break; } } else { @@ -1287,7 +1317,7 @@ private void updateWatchedCollection(Set liveNodes, String coll, DocColl } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); - notifyStateWatchers(liveNodes, coll, newState); + changed = true; break; } } @@ -1299,6 +1329,8 @@ private void updateWatchedCollection(Set liveNodes, String coll, DocColl LOG.info("Unwatching collection [{}]", coll); } + return changed; + } public static class ConfigData { diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 91d0242dd00f..d2863f2b42d7 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -139,6 +139,9 @@ public void testPredicateFailureTimesOut() throws Exception { expectThrows(TimeoutException.class, () -> { client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); }); + long count = client.getZkStateReader().getStateWatchCount("nosuchcollection"); + assertTrue("Watchers for collection should be removed after timeout", count == 0); + } @Test From 231f1e217d71ad13be92970640367541c416a90e Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Mon, 2 May 2016 21:14:52 +0100 Subject: [PATCH 06/15] Move CollectionStateWatcher back out --- .../client/solrj/impl/CloudSolrClient.java | 16 +++++++ .../cloud/CollectionStatePredicate.java | 1 - .../common/cloud/CollectionStateWatcher.java | 42 +++++++++++++++++++ .../solr/common/cloud/ZkStateReader.java | 25 +++++------ .../cloud/TestCollectionStateWatchers.java | 33 ++++++++++----- 5 files changed, 93 insertions(+), 24 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index e8a967407a79..e17921596bbb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -57,6 +57,7 @@ import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -592,6 +593,21 @@ public void waitForState(String collection, long wait, TimeUnit unit, Collection zkStateReader.waitForState(collection, wait, unit, predicate); } + /** + * Register a CollectionStateWatcher to be called when the cluster state for a collection changes + * + * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, + * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * call + * + * @param collection the collection to watch + * @param watcher a watcher that will be called when the state changes + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + connect(); + zkStateReader.registerCollectionStateWatcher(collection, watcher); + } + private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java index 1112daa0143d..0b0a28eeed04 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -36,7 +36,6 @@ public interface CollectionStatePredicate { * @param liveNodes the current set of live nodes * @param collectionState the latest collection state, or null if the collection * does not exist - * */ boolean matches(Set liveNodes, DocCollection collectionState); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java new file mode 100644 index 000000000000..0bf66b012e8b --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java @@ -0,0 +1,42 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +import java.util.Set; + +/** + * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} + * and called whenever the collection state changes. + */ +public interface CollectionStateWatcher { + + /** + * Called when the collection we are registered against has a change of state + * + * Note that, due to the way Zookeeper watchers are implemented, a single call may be + * the result of several state changes + * + * A watcher is unregistered after it has been called once. To make a watcher persistent, + * implementors should re-register during this call. + * + * @param liveNodes the set of live nodes + * @param collectionState the new collection state + */ + void onStateChanged(Set liveNodes, DocCollection collectionState); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 582402b20307..345ff4239d16 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -133,7 +133,7 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; - private final ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); + private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); private class CollectionWatch { @@ -1181,7 +1181,14 @@ public void unregisterCore(String collection) { } } - private void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + /** + * Register a CollectionStateWatcher to be called when the state of a collection changes + * + * A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher, + * it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * method. + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) { @@ -1280,11 +1287,11 @@ private void notifyStateWatchers(Set liveNodes, String collection, DocCo } /* package-private for testing */ - int getStateWatchCount(String collection) { + Set getStateWatchers(String collection) { CollectionWatch watch = collectionWatches.get(collection); if (watch == null) - return 0; - return watch.stateWatchers.size(); + return null; + return new HashSet<>(watch.stateWatchers); } // returns true if the state has changed @@ -1326,7 +1333,7 @@ private boolean updateWatchedCollection(Set liveNodes, String coll, DocC // Resolve race with removeZKWatch. if (!collectionWatches.containsKey(coll)) { watchedCollectionStates.remove(coll); - LOG.info("Unwatching collection [{}]", coll); + LOG.info("Removing uninteresting collection [{}]", coll); } return changed; @@ -1346,10 +1353,4 @@ public ConfigData(Map data, int version) { } } - - private interface CollectionStateWatcher { - - void onStateChanged(Set liveNodes, DocCollection collectionState); - - } } diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index d2863f2b42d7..5f91712487da 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -18,11 +18,13 @@ */ import java.util.HashMap; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -35,6 +37,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.core.Is.is; + public class TestCollectionStateWatchers extends SolrCloudTestCase { private static final int CLUSTER_SIZE = 4; @@ -88,19 +92,25 @@ public void testSimpleCollectionWatch() throws Exception { client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); // shutdown a node and check that we get notified about the change - Future future = waitInBackground("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> { - int nodecount = 0; - for (Slice slice : c) { + final AtomicInteger nodeCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { + // we can't just count liveNodes here, because that's updated by a separate watcher, + // and it may be the case that we're triggered by a node setting itself to DOWN before + // the liveNodes watcher is called + for (Slice slice : collectionState) { for (Replica replica : slice) { - if (replica.isActive(n)) - nodecount++; + if (replica.isActive(liveNodes)) + nodeCount.incrementAndGet(); } } - return nodecount == 3; + latch.countDown(); }); + cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); - assertTrue("CollectionStateWatcher was never notified of cluster change", future.get()); + assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertThat(nodeCount.intValue(), is(3)); } @@ -178,21 +188,22 @@ public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Excepti public void testWatcherIsRemovedAfterTimeout() { CloudSolrClient client = cluster.getSolrClient(); assertTrue("There should be no watchers for a non-existent collection!", - client.getZkStateReader().getStateWatchCount("no-such-collection") == 0); + client.getZkStateReader().getStateWatchers("no-such-collection") == null); expectThrows(TimeoutException.class, () -> { client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, DocCollection::isFullyActive); }); - long count = client.getZkStateReader().getStateWatchCount("no-such-collection"); - assertTrue("Watchers for collection should be removed after timeout", count == 0); + Set watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); + assertTrue("Watchers for collection should be removed after timeout", + watchers == null || watchers.size() == 0); } @Test public void testDeletionsTriggerWatches() throws Exception { cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>()); - Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> c == null); + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isDeleted); CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); From eab0d463ec1647fd5e181025854b26371779e2d5 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Tue, 3 May 2016 11:21:06 +0100 Subject: [PATCH 07/15] Move notifications to an executor --- .../solr/common/cloud/ZkStateReader.java | 43 +++++++++++++------ .../cloud/TestCollectionStateWatchers.java | 7 +-- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 345ff4239d16..486de67bbb66 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -33,6 +33,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +42,9 @@ import org.apache.solr.common.Callable; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -133,6 +136,10 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; + private final ExecutorService collectionWatchExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( + new SolrjNamedThreadFactory("collectionStateWatchers") + ); + private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); private class CollectionWatch { @@ -256,12 +263,12 @@ public void updateClusterState() throws KeeperException, InterruptedException { return; } // No need to set watchers because we should already have watchers registered for everything. - collectionsToNotify.addAll(refreshLegacyClusterState(null, liveNodes)); + collectionsToNotify.addAll(refreshLegacyClusterState(null)); // Need a copy so we don't delete from what we're iterating over. Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); for (String coll : safeCopy) { DocCollection newState = fetchCollectionState(coll, null); - if (updateWatchedCollection(liveNodes, coll, newState)) { + if (updateWatchedCollection(coll, newState)) { collectionsToNotify.add(coll); } } @@ -291,7 +298,7 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int if (ref == null || legacyCollectionStates.containsKey(collection)) { // We either don't know anything about this collection (maybe it's new?) or it's legacy. // First update the legacy cluster state. - changedLegacyCollections = refreshLegacyClusterState(null, liveNodes); + changedLegacyCollections = refreshLegacyClusterState(null); if (!legacyCollectionStates.containsKey(collection)) { // No dice, see if a new collection just got created. LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection); @@ -307,11 +314,11 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int return; } // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there. - changedLegacyCollections = refreshLegacyClusterState(null, liveNodes); + changedLegacyCollections = refreshLegacyClusterState(null); } else if (watchedCollectionStates.containsKey(collection)) { // Exists as a watched collection, force a refresh. DocCollection newState = fetchCollectionState(collection, null); - changed = updateWatchedCollection(liveNodes, collection, newState); + changed = updateWatchedCollection(collection, newState); } constructState(); } @@ -342,7 +349,7 @@ public Integer compareStateVersions(String coll, int version) { DocCollection nu = getCollectionLive(this, coll); if (nu == null) return -1 ; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { - updateWatchedCollection(liveNodes, coll, nu); + updateWatchedCollection(coll, nu); collection = nu; } } @@ -370,7 +377,7 @@ public synchronized void createClusterStateWatchersAndUpdate() throws KeeperExce // on reconnect of SolrZkClient force refresh and re-add watches. refreshLiveNodes(new LiveNodeWatcher()); - refreshLegacyClusterState(new LegacyClusterStateWatcher(), liveNodes); + refreshLegacyClusterState(new LegacyClusterStateWatcher()); refreshStateFormat2Collections(); refreshCollectionList(new CollectionsChildWatcher()); @@ -506,7 +513,7 @@ private void constructState() { * * Returns the collections that have changed */ - private List refreshLegacyClusterState(Watcher watcher, Set liveNodes) + private List refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException { List changedCollections = new ArrayList<>(); try { @@ -680,6 +687,12 @@ public Object getUpdateLock() { public void close() { this.closed = true; + try { + ExecutorUtil.shutdownAndAwaitTermination(collectionWatchExecutor); + } + catch (Exception e) { + LOG.error("Error shutting down collection watch executor", e); + } if (closeClient) { zkClient.close(); } @@ -960,7 +973,7 @@ public void process(WatchedEvent event) { public boolean refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); - return updateWatchedCollection(liveNodes, coll, newState); + return updateWatchedCollection(coll, newState); } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -998,7 +1011,7 @@ public void process(WatchedEvent event) { /** Must hold {@link #getUpdateLock()} before calling this method. */ public List refreshAndWatch() { try { - return refreshLegacyClusterState(this, liveNodes); + return refreshLegacyClusterState(this); } catch (KeeperException.NoNodeException e) { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready"); @@ -1281,9 +1294,11 @@ private void notifyStateWatchers(Set liveNodes, String collection, DocCo v.stateWatchers.clear(); return v; }); - for (CollectionStateWatcher watcher : watchers) { - watcher.onStateChanged(liveNodes, collectionState); - } + collectionWatchExecutor.submit(() -> { + for (CollectionStateWatcher watcher : watchers) { + watcher.onStateChanged(liveNodes, collectionState); + } + }); } /* package-private for testing */ @@ -1295,7 +1310,7 @@ Set getStateWatchers(String collection) { } // returns true if the state has changed - private boolean updateWatchedCollection(Set liveNodes, String coll, DocCollection newState) { + private boolean updateWatchedCollection(String coll, DocCollection newState) { if (newState == null) { LOG.info("Deleting data for [{}]", coll); diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 5f91712487da..51af26e577f6 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -149,8 +149,9 @@ public void testPredicateFailureTimesOut() throws Exception { expectThrows(TimeoutException.class, () -> { client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); }); - long count = client.getZkStateReader().getStateWatchCount("nosuchcollection"); - assertTrue("Watchers for collection should be removed after timeout", count == 0); + Set watchers = client.getZkStateReader().getStateWatchers("nosuchcollection"); + assertTrue("Watchers for collection should be removed after timeout", + watchers == null || watchers.size() == 0); } @@ -203,7 +204,7 @@ public void testWatcherIsRemovedAfterTimeout() { @Test public void testDeletionsTriggerWatches() throws Exception { cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>()); - Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isDeleted); + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null); CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); From 545c33592796852735e09dd91ffa10e1efd47c8f Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Mon, 9 May 2016 21:10:10 +0100 Subject: [PATCH 08/15] Remove executor again --- .../solr/common/cloud/ZkStateReader.java | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 38a0354f74dd..2dfaa87aef75 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -33,7 +33,6 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,9 +41,7 @@ import org.apache.solr.common.Callable; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Pair; -import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -138,10 +135,6 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; - private final ExecutorService collectionWatchExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( - new SolrjNamedThreadFactory("collectionStateWatchers") - ); - private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); private class CollectionWatch { @@ -691,12 +684,6 @@ public Object getUpdateLock() { public void close() { this.closed = true; - try { - ExecutorUtil.shutdownAndAwaitTermination(collectionWatchExecutor); - } - catch (Exception e) { - LOG.error("Error shutting down collection watch executor", e); - } if (closeClient) { zkClient.close(); } @@ -1297,11 +1284,9 @@ private void notifyStateWatchers(Set liveNodes, String collection, DocCo v.stateWatchers.clear(); return v; }); - collectionWatchExecutor.submit(() -> { - for (CollectionStateWatcher watcher : watchers) { - watcher.onStateChanged(liveNodes, collectionState); - } - }); + for (CollectionStateWatcher watcher : watchers) { + watcher.onStateChanged(liveNodes, collectionState); + } } /* package-private for testing */ From 7d667622850aa3eb65dec3340ddcf85c663b49bb Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Mon, 9 May 2016 21:18:08 +0100 Subject: [PATCH 09/15] Remove forwarding registerCollectionWatcher method on CSC --- .../solr/client/solrj/impl/CloudSolrClient.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index e17921596bbb..e8a967407a79 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -57,7 +57,6 @@ import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CollectionStatePredicate; -import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -593,21 +592,6 @@ public void waitForState(String collection, long wait, TimeUnit unit, Collection zkStateReader.waitForState(collection, wait, unit, predicate); } - /** - * Register a CollectionStateWatcher to be called when the cluster state for a collection changes - * - * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, - * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} - * call - * - * @param collection the collection to watch - * @param watcher a watcher that will be called when the state changes - */ - public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { - connect(); - zkStateReader.registerCollectionStateWatcher(collection, watcher); - } - private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); From 9dde353ceb9fff37a4f1487d9da385e94e6cbaba Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Tue, 10 May 2016 09:53:03 +0100 Subject: [PATCH 10/15] Revert "Remove forwarding registerCollectionWatcher method on CSC" This reverts commit 7d667622850aa3eb65dec3340ddcf85c663b49bb. --- .../solr/client/solrj/impl/CloudSolrClient.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index e8a967407a79..e17921596bbb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -57,6 +57,7 @@ import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -592,6 +593,21 @@ public void waitForState(String collection, long wait, TimeUnit unit, Collection zkStateReader.waitForState(collection, wait, unit, predicate); } + /** + * Register a CollectionStateWatcher to be called when the cluster state for a collection changes + * + * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, + * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * call + * + * @param collection the collection to watch + * @param watcher a watcher that will be called when the state changes + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + connect(); + zkStateReader.registerCollectionStateWatcher(collection, watcher); + } + private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); From 873c2256baef131272a95617c750915827ad285b Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Tue, 10 May 2016 11:31:49 +0100 Subject: [PATCH 11/15] Move to a notifications queue --- .../solr/common/cloud/ZkStateReader.java | 157 ++++++++++-------- .../apache/solr/common/util/ExecutorUtil.java | 11 +- 2 files changed, 92 insertions(+), 76 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 2dfaa87aef75..e44b04ffa11f 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -31,8 +31,11 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +44,7 @@ import org.apache.solr.common.Callable; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.CreateMode; @@ -137,6 +141,9 @@ public class ZkStateReader implements Closeable { private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); + private final ExecutorService notificationsExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor("watches"); + private final BlockingQueue notifications = new LinkedBlockingQueue<>(); + private class CollectionWatch { int coreRefCount = 0; @@ -212,6 +219,7 @@ public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) { this.configManager = new ZkConfigManager(zkClient); this.closeClient = false; this.securityNodeListener = securityNodeListener; + this.notificationsExecutor.submit(new NotificationsExecutor()); } @@ -237,6 +245,7 @@ public void command() { this.configManager = new ZkConfigManager(zkClient); this.closeClient = true; this.securityNodeListener = null; + this.notificationsExecutor.submit(new NotificationsExecutor()); } public ZkConfigManager getConfigManager() { @@ -250,8 +259,6 @@ public ZkConfigManager getConfigManager() { */ @Deprecated public void updateClusterState() throws KeeperException, InterruptedException { - List collectionsToNotify = new ArrayList<>(); - Set liveNodes = this.liveNodes; // volatile read synchronized (getUpdateLock()) { if (clusterState == null) { // Never initialized, just run normal initialization. @@ -259,22 +266,17 @@ public void updateClusterState() throws KeeperException, InterruptedException { return; } // No need to set watchers because we should already have watchers registered for everything. - collectionsToNotify.addAll(refreshLegacyClusterState(null)); + refreshLegacyClusterState(null); // Need a copy so we don't delete from what we're iterating over. Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); for (String coll : safeCopy) { DocCollection newState = fetchCollectionState(coll, null); - if (updateWatchedCollection(coll, newState)) { - collectionsToNotify.add(coll); - } + updateWatchedCollection(coll, newState); } refreshCollectionList(null); refreshLiveNodes(null); constructState(); } - for (String collection : collectionsToNotify) { - notifyStateWatchers(liveNodes, collection, watchedCollectionStates.get(collection)); - } } /** @@ -282,9 +284,7 @@ public void updateClusterState() throws KeeperException, InterruptedException { * a better design is possible. */ public void forceUpdateCollection(String collection) throws KeeperException, InterruptedException { - boolean changed = false; - List changedLegacyCollections = null; - Set liveNodes = this.liveNodes; // volatile read + synchronized (getUpdateLock()) { if (clusterState == null) { return; @@ -294,7 +294,7 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int if (ref == null || legacyCollectionStates.containsKey(collection)) { // We either don't know anything about this collection (maybe it's new?) or it's legacy. // First update the legacy cluster state. - changedLegacyCollections = refreshLegacyClusterState(null); + refreshLegacyClusterState(null); if (!legacyCollectionStates.containsKey(collection)) { // No dice, see if a new collection just got created. LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection); @@ -310,22 +310,15 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int return; } // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there. - changedLegacyCollections = refreshLegacyClusterState(null); + refreshLegacyClusterState(null); } else if (watchedCollectionStates.containsKey(collection)) { // Exists as a watched collection, force a refresh. DocCollection newState = fetchCollectionState(collection, null); - changed = updateWatchedCollection(collection, newState); + updateWatchedCollection(collection, newState); } constructState(); } - if (changedLegacyCollections != null) { - for (String changedCollection : changedLegacyCollections) { - notifyStateWatchers(liveNodes, changedCollection, legacyCollectionStates.get(changedCollection).get()); - } - } - if (changed) { - notifyStateWatchers(liveNodes, collection, watchedCollectionStates.get(collection)); - } + } /** Refresh the set of live nodes. */ @@ -506,12 +499,8 @@ private void constructState() { /** * Refresh legacy (shared) clusterstate.json - * - * Returns the collections that have changed */ - private List refreshLegacyClusterState(Watcher watcher) - throws KeeperException, InterruptedException { - List changedCollections = new ArrayList<>(); + private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException { try { final Stat stat = new Stat(); final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); @@ -519,8 +508,9 @@ private List refreshLegacyClusterState(Watcher watcher) synchronized (getUpdateLock()) { if (this.legacyClusterStateVersion >= stat.getVersion()) { // Nothing to do, someone else updated same or newer. - return changedCollections; + return; } + Set liveNodes = this.liveNodes; // volatile read for (Map.Entry watchEntry : this.collectionWatches.entrySet()) { String coll = watchEntry.getKey(); CollectionWatch collWatch = watchEntry.getValue(); @@ -531,7 +521,7 @@ private List refreshLegacyClusterState(Watcher watcher) DocCollection newState = ref.get(); if (!collWatch.stateWatchers.isEmpty() && !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) { - changedCollections.add(coll); + notifyStateWatchers(liveNodes, coll, newState); } } this.legacyCollectionStates = loadedData.getCollectionStates(); @@ -544,7 +534,6 @@ private List refreshLegacyClusterState(Watcher watcher) this.legacyClusterStateVersion = 0; } } - return changedCollections; } /** @@ -684,6 +673,8 @@ public Object getUpdateLock() { public void close() { this.closed = true; + notificationsExecutor.shutdownNow(); // interrupt + ExecutorUtil.shutdownAndAwaitTermination(notificationsExecutor); if (closeClient) { zkClient.close(); } @@ -946,13 +937,11 @@ public void process(WatchedEvent event) { LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", event, coll, liveNodesSize); - boolean changed = refreshAndWatch(); + refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } - if (changed) { - notifyStateWatchers(liveNodes, coll, watchedCollectionStates.get(coll)); - } + } /** @@ -960,10 +949,10 @@ public void process(WatchedEvent event) { * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates} * with the results of the refresh. */ - public boolean refreshAndWatch() { + public void refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); - return updateWatchedCollection(coll, newState); + updateWatchedCollection(coll, newState); } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -973,7 +962,6 @@ public boolean refreshAndWatch() { Thread.currentThread().interrupt(); LOG.error("Unwatched collection: [{}]", coll, e); } - return false; } } @@ -988,20 +976,16 @@ public void process(WatchedEvent event) { } int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size(); LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize); - List changedCollections = refreshAndWatch(); + refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } - Set liveNodes = ZkStateReader.this.liveNodes; // volatile read - for (String collection : changedCollections) { - notifyStateWatchers(liveNodes, collection, legacyCollectionStates.get(collection).get()); - } } /** Must hold {@link #getUpdateLock()} before calling this method. */ - public List refreshAndWatch() { + public void refreshAndWatch() { try { - return refreshLegacyClusterState(this); + refreshLegacyClusterState(this); } catch (KeeperException.NoNodeException e) { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready"); @@ -1015,7 +999,6 @@ public List refreshAndWatch() { Thread.currentThread().interrupt(); LOG.warn("Interrupted", e); } - return Collections.emptyList(); } } @@ -1275,20 +1258,6 @@ public void removeCollectionStateWatcher(String collection, CollectionStateWatch }); } - private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { - List watchers = new ArrayList<>(); - collectionWatches.compute(collection, (k, v) -> { - if (v == null) - return null; - watchers.addAll(v.stateWatchers); - v.stateWatchers.clear(); - return v; - }); - for (CollectionStateWatcher watcher : watchers) { - watcher.onStateChanged(liveNodes, collectionState); - } - } - /* package-private for testing */ Set getStateWatchers(String collection) { CollectionWatch watch = collectionWatches.get(collection); @@ -1298,16 +1267,17 @@ Set getStateWatchers(String collection) { } // returns true if the state has changed - private boolean updateWatchedCollection(String coll, DocCollection newState) { + private void updateWatchedCollection(String coll, DocCollection newState) { + + Set liveNodes = this.liveNodes; // volatile read if (newState == null) { LOG.info("Deleting data for [{}]", coll); watchedCollectionStates.remove(coll); - return true; + notifyStateWatchers(liveNodes, coll, null); + return; } - boolean changed = false; - // CAS update loop while (true) { if (!collectionWatches.containsKey(coll)) { @@ -1317,7 +1287,7 @@ private boolean updateWatchedCollection(String coll, DocCollection newState) { if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); - changed = true; + notifyStateWatchers(liveNodes, coll, newState); break; } } else { @@ -1327,20 +1297,18 @@ private boolean updateWatchedCollection(String coll, DocCollection newState) { } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); - changed = true; + notifyStateWatchers(liveNodes, coll, newState); break; } } } - // Resolve race with removeZKWatch. + // Resolve race with unregisterCore. if (!collectionWatches.containsKey(coll)) { watchedCollectionStates.remove(coll); LOG.info("Removing uninteresting collection [{}]", coll); } - return changed; - } public static class ConfigData { @@ -1356,4 +1324,55 @@ public ConfigData(Map data, int version) { } } + + private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { + notifications.add(new Notification(liveNodes, collection, collectionState)); + } + + private class Notification implements Runnable { + + final Set liveNodes; + final String collection; + final DocCollection collectionState; + + private Notification(Set liveNodes, String collection, DocCollection collectionState) { + this.liveNodes = liveNodes; + this.collection = collection; + this.collectionState = collectionState; + } + + @Override + public void run() { + List watchers = new ArrayList<>(); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + watchers.addAll(v.stateWatchers); + v.stateWatchers.clear(); + return v; + }); + for (CollectionStateWatcher watcher : watchers) { + watcher.onStateChanged(liveNodes, collectionState); + } + } + + } + + private class NotificationsExecutor implements Runnable { + @Override + public void run() { + while (closed == false && Thread.currentThread().isInterrupted() == false) { + try { + Notification notification = notifications.take(); + notification.run(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (RuntimeException e) { + LOG.error("Error running collection watch", e); + } + } + } + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index b8d9ac41b0af..4222856158d2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -19,16 +19,9 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; - -import java.util.Enumeration; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -153,6 +146,10 @@ public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory thre threadFactory); } + public static ExecutorService newMDCAwareSingleThreadExecutor(String name) { + return newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory(name)); + } + /** * See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} */ From 7f201d28c89eb041323ed0623589aa08f4d53f25 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Tue, 10 May 2016 19:40:21 +0100 Subject: [PATCH 12/15] Executors are simpler, aren't they... --- .../solr/common/cloud/ZkStateReader.java | 38 ++++++------------- .../apache/solr/common/util/ExecutorUtil.java | 10 +++++ 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index e44b04ffa11f..c695cfb343ed 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -31,11 +31,10 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -141,8 +140,7 @@ public class ZkStateReader implements Closeable { private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); - private final ExecutorService notificationsExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor("watches"); - private final BlockingQueue notifications = new LinkedBlockingQueue<>(); + private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches"); private class CollectionWatch { @@ -219,7 +217,6 @@ public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) { this.configManager = new ZkConfigManager(zkClient); this.closeClient = false; this.securityNodeListener = securityNodeListener; - this.notificationsExecutor.submit(new NotificationsExecutor()); } @@ -245,7 +242,6 @@ public void command() { this.configManager = new ZkConfigManager(zkClient); this.closeClient = true; this.securityNodeListener = null; - this.notificationsExecutor.submit(new NotificationsExecutor()); } public ZkConfigManager getConfigManager() { @@ -673,8 +669,8 @@ public Object getUpdateLock() { public void close() { this.closed = true; - notificationsExecutor.shutdownNow(); // interrupt - ExecutorUtil.shutdownAndAwaitTermination(notificationsExecutor); + notifications.shutdownNow(); // interrupt + ExecutorUtil.shutdownAndAwaitTermination(notifications); if (closeClient) { zkClient.close(); } @@ -1326,7 +1322,14 @@ public ConfigData(Map data, int version) { } private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { - notifications.add(new Notification(liveNodes, collection, collectionState)); + try { + notifications.submit(new Notification(liveNodes, collection, collectionState)); + } + catch (RejectedExecutionException e) { + if (closed == false) { + LOG.error("Couldn't run collection notifications for {}", collection, e); + } + } } private class Notification implements Runnable { @@ -1358,21 +1361,4 @@ public void run() { } - private class NotificationsExecutor implements Runnable { - @Override - public void run() { - while (closed == false && Thread.currentThread().isInterrupted() == false) { - try { - Notification notification = notifications.take(); - notification.run(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - catch (RuntimeException e) { - LOG.error("Error running collection watch", e); - } - } - } - } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index 4222856158d2..eb008c969cf4 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -146,10 +146,20 @@ public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory thre threadFactory); } + /** + * Create a single thread executor using a named thread factory + */ public static ExecutorService newMDCAwareSingleThreadExecutor(String name) { return newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory(name)); } + /** + * Create a cached thread pool using a named thread factory + */ + public static ExecutorService newMDCAwareCachedThreadPool(String name) { + return newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(name)); + } + /** * See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} */ From 82bb198e905fbde2cf6de3771c51325263fc0bf2 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 11 May 2016 09:52:37 +0100 Subject: [PATCH 13/15] cleanups --- .../java/org/apache/solr/common/cloud/ZkStateReader.java | 9 ++++----- .../java/org/apache/solr/common/util/ExecutorUtil.java | 7 ------- .../solr/common/cloud/TestCollectionStateWatchers.java | 2 +- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index c695cfb343ed..5fc6e2d88bd6 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -513,7 +513,7 @@ private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); if (ref == null) continue; - // watched collection, so this will always be local + // legacy collections are always in-memory DocCollection newState = ref.get(); if (!collWatch.stateWatchers.isEmpty() && !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) { @@ -669,8 +669,7 @@ public Object getUpdateLock() { public void close() { this.closed = true; - notifications.shutdownNow(); // interrupt - ExecutorUtil.shutdownAndAwaitTermination(notifications); + notifications.shutdown(); if (closeClient) { zkClient.close(); } @@ -1123,7 +1122,7 @@ public void registerCore(String collection) { v.coreRefCount++; return v; }); - if (reconstructState.get()) { + if (reconstructState.get() && legacyCollectionStates.containsKey(collection) == false) { new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); @@ -1180,7 +1179,7 @@ public void registerCollectionStateWatcher(String collection, CollectionStateWat v.stateWatchers.add(stateWatcher); return v; }); - if (watchSet.get()) { + if (watchSet.get() && legacyCollectionStates.containsKey(collection) == false) { new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index eb008c969cf4..5f307a8bcc02 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -146,13 +146,6 @@ public static ExecutorService newMDCAwareSingleThreadExecutor(ThreadFactory thre threadFactory); } - /** - * Create a single thread executor using a named thread factory - */ - public static ExecutorService newMDCAwareSingleThreadExecutor(String name) { - return newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory(name)); - } - /** * Create a cached thread pool using a named thread factory */ diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 51af26e577f6..d44fb3ed155b 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -135,7 +135,7 @@ public void testWaitForStateChecksCurrentState() throws Exception { } @Test - public void testCanWatchForNonexistantCollection() throws Exception { + public void testCanWaitForNonexistantCollection() throws Exception { Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); cluster.createCollection("delayed", 1, 1, "config", new HashMap<>()); From ee8d7879228213182f684039d993c13bb9883d41 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 11 May 2016 20:40:20 +0100 Subject: [PATCH 14/15] ...and one more --- .../src/java/org/apache/solr/common/cloud/ZkStateReader.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 5fc6e2d88bd6..f36560bd5a8e 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1122,7 +1122,7 @@ public void registerCore(String collection) { v.coreRefCount++; return v; }); - if (reconstructState.get() && legacyCollectionStates.containsKey(collection) == false) { + if (reconstructState.get()) { new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); @@ -1179,7 +1179,7 @@ public void registerCollectionStateWatcher(String collection, CollectionStateWat v.stateWatchers.add(stateWatcher); return v; }); - if (watchSet.get() && legacyCollectionStates.containsKey(collection) == false) { + if (watchSet.get()) { new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); From ba904f993c2509ff4a1cbe80d821510a7b9a74d3 Mon Sep 17 00:00:00 2001 From: Alan Woodward Date: Wed, 11 May 2016 23:52:01 +0100 Subject: [PATCH 15/15] Get tests compiling --- .../solr/cloud/overseer/ZkStateReaderTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index 2964d7b01d2a..ad51614b19ce 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -62,6 +62,7 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -71,7 +72,7 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); if (isInteresting) { reader.registerCore("c1"); @@ -136,7 +137,7 @@ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting assertEquals(2, collection.getStateFormat()); } } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); } @@ -146,6 +147,7 @@ public void testExternalCollectionWatchedNotWatched() throws Exception{ String zkDir = createTempDir("testExternalCollectionWatchedNotWatched").toFile().getAbsolutePath(); ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -155,7 +157,7 @@ public void testExternalCollectionWatchedNotWatched() throws Exception{ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); @@ -176,7 +178,7 @@ public void testExternalCollectionWatchedNotWatched() throws Exception{ assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); } } @@ -187,6 +189,7 @@ public void testWatchedCollectionCreation() throws Exception { ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -196,7 +199,7 @@ public void testWatchedCollectionCreation() throws Exception { zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); reader.registerCore("c1"); @@ -234,7 +237,7 @@ public void testWatchedCollectionCreation() throws Exception { assertFalse(ref.isLazilyLoaded()); assertEquals(2, ref.get().getStateFormat()); } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); }