Permalink
Browse files

Amendment to OOZIE-1847

  • Loading branch information...
egashira committed Oct 7, 2014
1 parent b87686b commit 9f150b0b150c813c7a973046cf2b864ee5e1d27c
@@ -31,20 +31,23 @@
public class ZKConnectionListener implements ConnectionStateListener {
private XLog LOG = XLog.getLog(getClass());
+ private static ConnectionState connectionState;
+ public static final String CONF_SHUTDOWN_ON_TIMEOUT = "oozie.zookeeper.server.shutdown.ontimeout";
public ZKConnectionListener() {
LOG.info("ZKConnectionListener started");
}
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+ connectionState = newState;
LOG.trace("ZK connection status = " + newState.toString());
-// if (newState == ConnectionState.CONNECTED) {
-// ZK connected
-// }
+ // if (newState == ConnectionState.CONNECTED) {
+ // ZK connected
+ // }
if (newState == ConnectionState.SUSPENDED) {
LOG.warn("ZK connection is suspended, waiting for reconnect. If connection doesn't reconnect before "
- + ZKUtils.getZKConnectionTimeout() + " Oozie server will shutdown itself");
+ + ZKUtils.getZKConnectionTimeout() + " (sec) Oozie server will shutdown itself");
}
if (newState == ConnectionState.RECONNECTED) {
@@ -53,10 +56,16 @@ public void stateChanged(final CuratorFramework client, final ConnectionState ne
}
if (newState == ConnectionState.LOST) {
- LOG.fatal("ZK is connection is not reconnected in " + ZKUtils.getZKConnectionTimeout()
- + ", shutting down Oozie server");
- Services.get().destroy();
- System.exit(1);
+ LOG.fatal("ZK is not reconnected in " + ZKUtils.getZKConnectionTimeout());
+ if (Services.get().getConf().getBoolean(CONF_SHUTDOWN_ON_TIMEOUT, true)) {
+ LOG.fatal("Shutting down Oozie server");
+ Services.get().destroy();
+ System.exit(1);
+ }
}
}
+
+ public static ConnectionState getZKConnectionState() {
+ return connectionState;
+ }
}
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.oozie.service;
import java.util.ArrayList;
@@ -26,9 +25,11 @@
import java.util.regex.Pattern;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.event.listener.ZKConnectionListener;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
@@ -82,7 +83,7 @@ public void init(Services services) throws ServiceException {
*/
@Override
public void destroy() {
- if (leaderLatch != null) {
+ if (leaderLatch != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
IOUtils.closeSafely(leaderLatch);
}
if (zk != null) {
@@ -15,24 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.oozie.service;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
+
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.event.listener.ZKConnectionListener;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.ZKUtils;
+
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.curator.framework.recipes.locks.ChildReaper;
import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.ThreadUtils;
+
import com.google.common.annotations.VisibleForTesting;
/**
@@ -78,15 +83,14 @@ public void init(Services services) throws ServiceException {
*/
@Override
public void destroy() {
- if (reaper != null) {
+ if (reaper != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
try {
reaper.close();
}
catch (IOException e) {
LOG.error("Error closing childReaper", e);
}
}
-
if (zk != null) {
zk.unregister(this);
}
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.oozie.util;
import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +34,7 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.x.discovery.ServiceCache;
@@ -163,7 +163,9 @@ public synchronized void unregister(Object user) {
// If there are no more classes using ZooKeeper, we should teardown everything.
users.remove(user);
if (users.isEmpty() && zk != null) {
- zk.teardown();
+ if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
+ zk.teardown();
+ }
zk = null;
}
}
@@ -172,7 +174,7 @@ private void createClient() throws Exception {
// Connect to the ZooKeeper server
RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
- String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+ String zkNamespace = getZKNameSpace();
zkConnectionTimeout = Services.get().getConf().getInt(ZK_CONNECTION_TIMEOUT, 180);
ACLProvider aclProvider;
@@ -405,6 +407,14 @@ private String getServicePrincipal() throws ServiceException {
public static RetryPolicy getRetryPloicy() {
return new ExponentialBackoffRetry(1000, 3);
}
+
+ /**
+ * Returns configured zk namesapces
+ * @return oozie.zookeeper.namespace
+ */
+ public static String getZKNameSpace() {
+ return Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+ }
/**
* Return ZK connection timeout
* @return
@@ -2069,7 +2069,16 @@
<value>180</value>
<description>
Default ZK connection timeout (in sec). If connection is lost for more than timeout, then Oozie server will shutdown
- itself.
+ itself if oozie.zookeeper.server.shutdown.ontimeout is true.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.zookeeper.server.shutdown.ontimeout</name>
+ <value>true</value>
+ <description>
+ If true, Oozie server will shutdown itself on ZK
+ connection timeout.
</description>
</property>
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.oozie.test;
import java.io.IOException;
@@ -35,6 +34,7 @@
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.oozie.event.listener.ZKConnectionListener;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.FixedJsonInstanceSerializer;
import org.apache.oozie.util.ZKUtils;
@@ -89,6 +89,7 @@ private void setUpZK() throws Exception {
zkServer = setupZKServer();
Services.get().getConf().set("oozie.zookeeper.connection.string", zkServer.getConnectString());
Services.get().getConf().set("oozie.instance.id", ZK_ID);
+ Services.get().getConf().setBoolean(ZKConnectionListener.CONF_SHUTDOWN_ON_TIMEOUT, false);
createClient();
createServiceDiscovery();
}

0 comments on commit 9f150b0

Please sign in to comment.