From aa3a96cd88d1724f3ab641f25c5373ff58f4616d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20B=C3=B8gh=20Ko=CC=88ster?= Date: Tue, 18 Apr 2017 11:43:52 +0200 Subject: [PATCH 1/2] Fixes a memory leak in zk schema watching --- .../solr/schema/ZkIndexSchemaReader.java | 100 ++++++++++++------ 1 file changed, 67 insertions(+), 33 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java index ee65fe84d6b7..cb208102c4d1 100644 --- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java +++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java @@ -17,6 +17,8 @@ package org.apache.solr.schema; import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; +import java.lang.ref.WeakReference; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.solr.cloud.ZkSolrResourceLoader; @@ -42,7 +44,7 @@ public class ZkIndexSchemaReader implements OnReconnect { private SolrZkClient zkClient; private String managedSchemaPath; private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on - private boolean isRemoved = false; + private final SchemaWatcher schemaWatcher; public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) { this.managedIndexSchemaFactory = managedIndexSchemaFactory; @@ -58,7 +60,9 @@ public void preClose(SolrCore core) { CoreContainer cc = core.getCoreDescriptor().getCoreContainer(); if (cc.isZooKeeperAware()) { log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down."); - ZkIndexSchemaReader.this.isRemoved = true; + if (schemaWatcher != null) { + schemaWatcher.stopWatching(); + } cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); } } @@ -67,7 +71,7 @@ public void preClose(SolrCore core) { public void postClose(SolrCore core) {} }); - createSchemaWatcher(); + this.schemaWatcher = createSchemaWatcher(); zkLoader.getZkController().addOnReconnectListener(this); } @@ -76,39 +80,17 @@ public Object getSchemaUpdateLock() { return managedIndexSchemaFactory.getSchemaUpdateLock(); } - public void createSchemaWatcher() { + /** + * Creates a schema watcher and returns it for controlling purposes. + * + * @return the registered {@linkplain SchemaWatcher}. + */ + public SchemaWatcher createSchemaWatcher() { log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath); + SchemaWatcher watcher = new SchemaWatcher(this); try { - zkClient.exists(managedSchemaPath, new Watcher() { - @Override - public void process(WatchedEvent event) { - - if (ZkIndexSchemaReader.this.isRemoved) { - return; // the core for this reader has already been removed, don't process this event - } - - // session events are not change events, and do not remove the watcher - if (Event.EventType.None.equals(event.getType())) { - return; - } - log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event); - try { - updateSchema(this, -1); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.warn("", e); - } - } - }, true); + zkClient.exists(managedSchemaPath, watcher, true); } catch (KeeperException e) { final String msg = "Error creating ZooKeeper watch for the managed schema"; log.error(msg, e); @@ -118,6 +100,58 @@ public void process(WatchedEvent event) { Thread.currentThread().interrupt(); log.warn("", e); } + + return watcher; + } + + /** + * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}. Watching can be stopped + * via {@linkplain #stopWatching()} which will remove references to the held {@linkplain SolrCore} and + * {@linkplain ZkIndexSchemaReader}. + */ + public static class SchemaWatcher implements Watcher { + + private final WeakReference schemaReader; + + public SchemaWatcher(ZkIndexSchemaReader reader) { + this.schemaReader = new WeakReference(Objects.requireNonNull(reader)); + } + + @Override + public void process(WatchedEvent event) { + ZkIndexSchemaReader indexSchemaReader = schemaReader.get(); + + if (indexSchemaReader == null) { + return; // the core for this reader has already been removed, don't process this event + } + + // session events are not change events, and do not remove the watcher + if (Event.EventType.None.equals(event.getType())) { + return; + } + log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event); + try { + indexSchemaReader.updateSchema(this, -1); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) { + log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); + return; + } + log.error("", e); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.warn("", e); + } + } + + /** + * Stop watching for changes and remove all references to the surrounding schema reader. + */ + public void stopWatching() { + schemaReader.clear(); + } } public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException { From 0c1cf969d487bbb63987624d887832f6ab7a8351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20B=C3=B8gh=20Ko=CC=88ster?= Date: Fri, 21 Apr 2017 14:51:21 +0200 Subject: [PATCH 2/2] clear cached field sources on core close --- .../schema/ExternalFileFieldReloader.java | 27 +++++++++++++++++++ .../solr/search/function/FileFloatSource.java | 24 ++++++++++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/schema/ExternalFileFieldReloader.java b/solr/core/src/java/org/apache/solr/schema/ExternalFileFieldReloader.java index b091d7788e48..b786aa232bc4 100644 --- a/solr/core/src/java/org/apache/solr/schema/ExternalFileFieldReloader.java +++ b/solr/core/src/java/org/apache/solr/schema/ExternalFileFieldReloader.java @@ -19,9 +19,11 @@ import org.apache.lucene.index.IndexReader; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.AbstractSolrEventListener; +import org.apache.solr.core.CloseHook; import org.apache.solr.core.SolrCore; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.function.FileFloatSource; +import org.apache.solr.util.RefCounted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +60,26 @@ public class ExternalFileFieldReloader extends AbstractSolrEventListener { public ExternalFileFieldReloader(SolrCore core) { super(core); datadir = core.getDataDir(); + + core.addCloseHook(new CloseHook() { + + @Override + public void preClose(SolrCore core) { + RefCounted searcher = core.getSearcher(); + try { + FileFloatSource.resetCacheFor(searcher.get().getIndexReader()); + FileFloatSource.resetCacheFor(searcher.get().getRawReader()); + fieldSources.clear(); + } finally { + searcher.decref(); + } + } + + @Override + public void postClose(SolrCore core) { + // noop + } + }); } @Override @@ -75,6 +97,11 @@ public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher current for (FileFloatSource fieldSource : fieldSources) { fieldSource.refreshCache(reader); } + + if (currentSearcher != null) { + FileFloatSource.resetCacheFor(currentSearcher.getIndexReader()); + FileFloatSource.resetCacheFor(currentSearcher.getRawReader()); + } } /** Caches FileFloatSource's from all ExternalFileField instances in the schema */ diff --git a/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java b/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java index 58e9d2ff92fc..371749930c65 100644 --- a/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java +++ b/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java @@ -125,6 +125,14 @@ public static void resetCache(){ floatCache.resetCache(); } + /** + * Remove all cached entries associated with the given index reader. + * Values are lazily loaded next time getValues() is called. + */ + public static void resetCacheFor(IndexReader reader){ + floatCache.resetCacheFor(reader); + } + /** * Refresh the cache for an IndexReader. The new values are loaded in the background * and then swapped in, so queries against the cache should not block while the reload @@ -207,8 +215,22 @@ public void resetCache(){ readerCache.clear(); } } - } + /** + * Removes and clears the inner cache for the given index reader + */ + public void resetCacheFor(IndexReader reader){ + synchronized(readerCache){ + Map innerCache = (Map) readerCache.remove(reader); + if (innerCache != null) { + // Map.clear() is optional and can throw UnsupportedOperationException, + // but readerCache is WeakHashMap and it supports clear(). + innerCache.clear(); + } + } + } + } + static Object onlyForTesting; // set to the last value static final class CreationPlaceholder {