From 68095d5aee5e5642679d7b092e12d2b16c18ccb1 Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Mon, 19 Jun 2017 12:18:53 -0400 Subject: [PATCH] NIFI-1586 Implements autopurging of transaction log and snapshot files When NiFi is clustered, and autopurge.purgeInterval is greater than 1, the DatadirCleanupManager will be started in order to automatically purge transaction log and snapshot files based on the autopurge settings in zookeeper.properties --- .../state/server/ZooKeeperStateServer.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java index 4c4b418e06a9..6cf4f3413ab3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java @@ -25,6 +25,7 @@ import java.util.Properties; import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.server.DatadirCleanupManager; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZKDatabase; @@ -47,6 +48,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { private FileTxnSnapLog transactionLog; private ZooKeeperServer embeddedZkServer; private QuorumPeer quorumPeer; + private DatadirCleanupManager datadirCleanupManager; private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { quorumPeerConfig = new QuorumPeerConfig(); @@ -58,6 +60,13 @@ public synchronized void start() throws IOException { return; } + if (quorumPeerConfig.isDistributed() && quorumPeerConfig.getPurgeInterval() > 0) { + datadirCleanupManager = new DatadirCleanupManager(quorumPeerConfig + .getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig + .getSnapRetainCount(), quorumPeerConfig.getPurgeInterval()); + datadirCleanupManager.start(); + } + if (quorumPeerConfig.isDistributed()) { startDistributed(); } else { @@ -153,6 +162,10 @@ public synchronized void shutdown() { if (embeddedZkServer != null && embeddedZkServer.isRunning()) { embeddedZkServer.shutdown(); } + + if (datadirCleanupManager != null) { + datadirCleanupManager.shutdown(); + } } }