From aa3a96cd88d1724f3ab641f25c5373ff58f4616d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20B=C3=B8gh=20Ko=CC=88ster?= Date: Tue, 18 Apr 2017 11:43:52 +0200 Subject: [PATCH 1/2] Fixes a memory leak in zk schema watching --- .../solr/schema/ZkIndexSchemaReader.java | 100 ++++++++++++------ 1 file changed, 67 insertions(+), 33 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java index ee65fe84d6b7..cb208102c4d1 100644 --- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java +++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java @@ -17,6 +17,8 @@ package org.apache.solr.schema; import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; +import java.lang.ref.WeakReference; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.solr.cloud.ZkSolrResourceLoader; @@ -42,7 +44,7 @@ public class ZkIndexSchemaReader implements OnReconnect { private SolrZkClient zkClient; private String managedSchemaPath; private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on - private boolean isRemoved = false; + private final SchemaWatcher schemaWatcher; public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) { this.managedIndexSchemaFactory = managedIndexSchemaFactory; @@ -58,7 +60,9 @@ public void preClose(SolrCore core) { CoreContainer cc = core.getCoreDescriptor().getCoreContainer(); if (cc.isZooKeeperAware()) { log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down."); - ZkIndexSchemaReader.this.isRemoved = true; + if (schemaWatcher != null) { + schemaWatcher.stopWatching(); + } cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); } } @@ -67,7 +71,7 @@ public void preClose(SolrCore core) { public void postClose(SolrCore core) {} }); - createSchemaWatcher(); + this.schemaWatcher = createSchemaWatcher(); zkLoader.getZkController().addOnReconnectListener(this); } @@ -76,39 +80,17 @@ public Object getSchemaUpdateLock() { return managedIndexSchemaFactory.getSchemaUpdateLock(); } - public void createSchemaWatcher() { + /** + * Creates a schema watcher and returns it for controlling purposes. + * + * @return the registered {@linkplain SchemaWatcher}. + */ + public SchemaWatcher createSchemaWatcher() { log.info("Creating ZooKeeper watch for the managed schema at " + managedSchemaPath); + SchemaWatcher watcher = new SchemaWatcher(this); try { - zkClient.exists(managedSchemaPath, new Watcher() { - @Override - public void process(WatchedEvent event) { - - if (ZkIndexSchemaReader.this.isRemoved) { - return; // the core for this reader has already been removed, don't process this event - } - - // session events are not change events, and do not remove the watcher - if (Event.EventType.None.equals(event.getType())) { - return; - } - log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event); - try { - updateSchema(this, -1); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.warn("", e); - } - } - }, true); + zkClient.exists(managedSchemaPath, watcher, true); } catch (KeeperException e) { final String msg = "Error creating ZooKeeper watch for the managed schema"; log.error(msg, e); @@ -118,6 +100,58 @@ public void process(WatchedEvent event) { Thread.currentThread().interrupt(); log.warn("", e); } + + return watcher; + } + + /** + * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}. Watching can be stopped + * via {@linkplain #stopWatching()} which will remove references to the held {@linkplain SolrCore} and + * {@linkplain ZkIndexSchemaReader}. + */ + public static class SchemaWatcher implements Watcher { + + private final WeakReference schemaReader; + + public SchemaWatcher(ZkIndexSchemaReader reader) { + this.schemaReader = new WeakReference(Objects.requireNonNull(reader)); + } + + @Override + public void process(WatchedEvent event) { + ZkIndexSchemaReader indexSchemaReader = schemaReader.get(); + + if (indexSchemaReader == null) { + return; // the core for this reader has already been removed, don't process this event + } + + // session events are not change events, and do not remove the watcher + if (Event.EventType.None.equals(event.getType())) { + return; + } + log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event); + try { + indexSchemaReader.updateSchema(this, -1); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) { + log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); + return; + } + log.error("", e); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.warn("", e); + } + } + + /** + * Stop watching for changes and remove all references to the surrounding schema reader. + */ + public void stopWatching() { + schemaReader.clear(); + } } public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException { From 4a2ecad154fb18bd224edc8fa716f8847ccbd7c5 Mon Sep 17 00:00:00 2001 From: Christine Poerschke Date: Wed, 26 Apr 2017 09:04:23 +0100 Subject: [PATCH 2/2] two suggestions * both createSchemaWatcher calls' result to be saved * removal of schemaWatcher null check since in practice schemaWatcher is never(?) null but schemaWatcher can be re-assigned to --- .../org/apache/solr/schema/ZkIndexSchemaReader.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java index cb208102c4d1..f54432261687 100644 --- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java +++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java @@ -44,7 +44,7 @@ public class ZkIndexSchemaReader implements OnReconnect { private SolrZkClient zkClient; private String managedSchemaPath; private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on - private final SchemaWatcher schemaWatcher; + private SchemaWatcher schemaWatcher; public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) { this.managedIndexSchemaFactory = managedIndexSchemaFactory; @@ -60,9 +60,7 @@ public void preClose(SolrCore core) { CoreContainer cc = core.getCoreDescriptor().getCoreContainer(); if (cc.isZooKeeperAware()) { log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down."); - if (schemaWatcher != null) { - schemaWatcher.stopWatching(); - } + schemaWatcher.stopWatching(); cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); } } @@ -190,8 +188,10 @@ private void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperE @Override public void command() { try { + // stop watching on the old watcher + this.schemaWatcher.stopWatching(); // setup a new watcher to get notified when the managed schema changes - createSchemaWatcher(); + this.schemaWatcher = createSchemaWatcher(); // force update now as the schema may have changed while our zk session was expired updateSchema(null, -1); } catch (Exception exc) {