Skip to content

Commit

Permalink
Add fully Retry Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jun 19, 2020
1 parent c2271ac commit 829da50
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 93 deletions.
5 changes: 5 additions & 0 deletions kyuubi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -96,44 +96,6 @@ object KyuubiConf {
.stringConf
.createWithDefault(Utils.resolveURI("embedded_zookeeper").getRawPath)

val HA_ZK_QUORUM: ConfigEntry[String] = buildConf("ha.zookeeper.quorum")
.doc("The connection string for the zookeeper ensemble")
.version("1.0.0")
.stringConf
.createWithDefault("")

val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace")
.doc("The connection string for the zookeeper ensemble")
.version("1.0.0")
.stringConf
.createWithDefault("")

val HA_ZK_CONNECTION_MAX_RETRIES: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.max.retries")
.doc("Max retry times for connecting to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(3)

val HA_ZK_CONNECTION_RETRY_WAIT: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.retry.wait")
.doc("Initial amount of time to wait between retries to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(1000)

val HA_ZK_CONNECTION_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.connection.timeout")
.doc("The timeout(ms) of creating the connection to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(60 * 1000)

val HA_ZK_SESSION_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.session.timeout")
.doc("The timeout(ms) of a connected session to be idled")
.version("1.0.0")
.intConf
.createWithDefault(60 * 1000)

val SERVER_PRINCIPAL: OptionalConfigEntry[String] = buildConf("server.principal")
.doc("")
.version("1.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ class KyuubiConfSuite extends KyuubiFunSuite {
val conf = new KyuubiConf()
assert(conf.get(EMBEDDED_ZK_PORT) === 2181)
assert(conf.get(EMBEDDED_ZK_TEMP_DIR).endsWith("embedded_zookeeper"))
assert(conf.get(HA_ZK_QUORUM).isEmpty)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.ha

import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf}
import org.apache.kyuubi.ha.client.RetryPolicies

object HighAvailabilityConf {

private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key)

val HA_ZK_QUORUM: ConfigEntry[String] = buildConf("ha.zookeeper.quorum")
.doc("The connection string for the zookeeper ensemble")
.version("1.0.0")
.stringConf
.createWithDefault("")

val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace")
.doc("The root directory for the service to deploy its instance uri. Additionally, it will" +
" creates a -[username] suffixed root directory for each application")
.version("1.0.0")
.stringConf
.createWithDefault("kyuubi")

val HA_ZK_CONN_MAX_RETRIES: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.max.retries")
.doc("Max retry times for connecting to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(3)

val HA_ZK_CONN_BASE_RETRY_WAIT: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.base.retry.wait")
.doc("Initial amount of time to wait between retries to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(1000)

val HA_ZK_CONN_MAX_RETRY_WAIT: ConfigEntry[Int] =
buildConf("ha.zookeeper.connection.max.retry.wait")
.doc(s"Max amount of time to wait between retries for" +
s" ${RetryPolicies.BONDED_EXPONENTIAL_BACKOFF} policy can reach, or max time until" +
s" elapsed for ${RetryPolicies.UNTIL_ELAPSED} policy to connect the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(30 * 1000)

val HA_ZK_CONN_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.connection.timeout")
.doc("The timeout(ms) of creating the connection to the zookeeper ensemble")
.version("1.0.0")
.intConf
.createWithDefault(15 * 1000)

val HA_ZK_SESSION_TIMEOUT: ConfigEntry[Int] = buildConf("ha.zookeeper.session.timeout")
.doc("The timeout(ms) of a connected session to be idled")
.version("1.0.0")
.intConf
.createWithDefault(60 * 1000)

val HA_ZK_CONN_RETRY_POLICY: ConfigEntry[String] =
buildConf("ha.zookeeper.connection.retry.policy")
.doc("The retry policy for connecting to the zookeeper ensemble, all candidates are:" +
s" ${RetryPolicies.values.mkString("[", ", ", "]")}")
.stringConf
.checkValues(RetryPolicies.values.map(_.toString))
.createWithDefault(RetryPolicies.EXPONENTIAL_BACKOFF.toString)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.ha.client

object RetryPolicies extends Enumeration {

type RetryPolicy = Value

val
/** A retry policy that retries only once */ ONE_TIME,

/** A retry policy that retries a max number of times */ N_TIME,

/** A retry policy that retries a set number of times with increasing sleep time between
* retries
*/
EXPONENTIAL_BACKOFF,

/** A retry policy that retries a set number of times with an increasing (up to a maximum
* bound) sleep time between retries
*/
BONDED_EXPONENTIAL_BACKOFF,

/** A retry policy that retries until a given amount of time elapses */ UNTIL_ELAPSED = Value

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import javax.security.auth.login.Configuration

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.retry.{BoundedExponentialBackoffRetry, ExponentialBackoffRetry, RetryNTimes, RetryOneTime, RetryUntilElapsed}
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.JaasConfiguration
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher}
Expand Down Expand Up @@ -69,7 +69,7 @@ class ServiceDiscovery private (
.forPath(s"/$namespace")
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException => throw e
case e: KeeperException => throw new KyuubiException(e)
}
super.initialize(conf)
}
Expand All @@ -92,7 +92,8 @@ class ServiceDiscovery private (
val watcher = new DeRegisterWatcher
if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
// No node exists, throw exception
throw new KyuubiException("Unable to create znode for this Kyuubi instance on ZooKeeper.")
throw new KyuubiException(s"Unable to create znode for this Kyuubi instance[$instance]" +
s" on ZooKeeper.")
}
info("Created a serviceNode on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
Expand Down Expand Up @@ -144,20 +145,33 @@ class ServiceDiscovery private (
}

object ServiceDiscovery {
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf._
import RetryPolicies._

private final val DEFAULT_ACL_PROVIDER = ZooKeeperACLProvider()

/**
* Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
* Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
*/
def newZookeeperClient(conf: KyuubiConf): CuratorFramework = {
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONNECTION_TIMEOUT)
val baseSleepTime = conf.get(HA_ZK_CONNECTION_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONNECTION_MAX_RETRIES)
val retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries)
val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT)
val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)

val retryPolicy = RetryPolicies.withName(retryPolicyName) match {
case ONE_TIME => new RetryOneTime(baseSleepTime)
case N_TIME => new RetryNTimes(maxRetries, baseSleepTime)
case BONDED_EXPONENTIAL_BACKOFF =>
new BoundedExponentialBackoffRetry(baseSleepTime, maxSleepTime, maxRetries)
case UNTIL_ELAPSED => new RetryUntilElapsed(maxSleepTime, baseSleepTime)
case _ => new ExponentialBackoffRetry(baseSleepTime, maxRetries)
}

val client = CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
Expand All @@ -179,12 +193,12 @@ object ServiceDiscovery {
*/
@throws[Exception]
def setUpZooKeeperAuth(conf: KyuubiConf): Unit = {
val keyTabFile = conf.get(SERVER_KEYTAB)
val maybePrincipal = conf.get(SERVER_PRINCIPAL)
val keyTabFile = conf.get(KyuubiConf.SERVER_KEYTAB)
val maybePrincipal = conf.get(KyuubiConf.SERVER_PRINCIPAL)
val kerberized = maybePrincipal.isDefined && keyTabFile.isDefined
if (UserGroupInformation.isSecurityEnabled && kerberized) {
if (!new File(keyTabFile.get).exists()) {
throw new IOException(s"${SERVER_KEYTAB.key} does not exists")
throw new IOException(s"${KyuubiConf.SERVER_KEYTAB.key} does not exists")
}
System.setProperty("zookeeper.sasl.clientconfig", "KyuubiZooKeeperClient")
var principal = maybePrincipal.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiFunSuite}
import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.server.EmbeddedZkServer
import org.apache.kyuubi.service.ServiceState

Expand All @@ -34,15 +34,15 @@ class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
val conf: KyuubiConf = KyuubiConf()

override def beforeAll(): Unit = {
conf.set(EMBEDDED_ZK_PORT, 0)
conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0)
zkServer.initialize(conf)
zkServer.start()
super.beforeAll()
}

override def afterAll(): Unit = {
conf.unset(SERVER_KEYTAB)
conf.unset(SERVER_PRINCIPAL)
conf.unset(KyuubiConf.SERVER_KEYTAB)
conf.unset(KyuubiConf.SERVER_PRINCIPAL)
conf.unset(HA_ZK_QUORUM)
zkServer.stop()
super.afterAll()
Expand All @@ -53,8 +53,8 @@ class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
val keytab = File.createTempFile("kentyao", ".keytab")
val principal = "kentyao/_HOST@apache.org"

conf.set(SERVER_KEYTAB, keytab.getCanonicalPath)
conf.set(SERVER_PRINCIPAL, principal)
conf.set(KyuubiConf.SERVER_KEYTAB, keytab.getCanonicalPath)
conf.set(KyuubiConf.SERVER_PRINCIPAL, principal)

ServiceDiscovery.setUpZooKeeperAuth(conf)
val configuration = Configuration.getConfiguration
Expand All @@ -66,17 +66,17 @@ class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
assert(options("principal") === "kentyao/localhost@apache.org")
assert(options("useKeyTab").toString.toBoolean)

conf.set(SERVER_KEYTAB, keytab.getName)
conf.set(KyuubiConf.SERVER_KEYTAB, keytab.getName)
val e = intercept[IOException](ServiceDiscovery.setUpZooKeeperAuth(conf))
assert(e.getMessage === s"${SERVER_KEYTAB.key} does not exists")
assert(e.getMessage === s"${KyuubiConf.SERVER_KEYTAB.key} does not exists")
}
}

test("publish instance to embedded zookeeper server") {

conf
.unset(SERVER_KEYTAB)
.unset(SERVER_PRINCIPAL)
.unset(KyuubiConf.SERVER_KEYTAB)
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ZK_QUORUM, zkServer.getConnectString)

val namespace = "kyuubiserver"
Expand Down

0 comments on commit 829da50

Please sign in to comment.