Skip to content

Commit

Permalink
Support EmbeddedZkServer
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jun 12, 2020
1 parent 15d1795 commit 5100d8a
Show file tree
Hide file tree
Showing 20 changed files with 447 additions and 21 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ ldap
kyuubi-server/kyuubi-kdc/
metrics/report.json
metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/
4 changes: 2 additions & 2 deletions kyuubi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -36,7 +36,7 @@
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion kyuubi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
61 changes: 60 additions & 1 deletion kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.kyuubi

import java.io.{File, InputStreamReader, IOException}
import java.net.{URI, URISyntaxException}
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.util.{Properties, UUID}

import scala.collection.JavaConverters._
import scala.util.{Success, Try}

private[kyuubi] object Utils extends Logging {

Expand Down Expand Up @@ -61,4 +63,61 @@ private[kyuubi] object Utils extends Logging {
}
}.getOrElse(Map.empty)
}


/**
* Return a well-formed URI for the file described by a user input string.
*
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
if (uri.getScheme != null) {
return uri
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment != null) {
val absoluteURI = new File(uri.getPath).getAbsoluteFile.toURI
return new URI(absoluteURI.getScheme, absoluteURI.getHost, absoluteURI.getPath,
uri.getFragment)
}
} catch {
case _: URISyntaxException =>
}
new File(path).getAbsoluteFile.toURI
}

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10

/**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
def createDirectory(root: String, namePrefix: String = "kyuubi"): File = {
(0 until MAX_DIR_CREATION_ATTEMPTS).foreach { _ =>
val dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
Try { dir.mkdirs() } match {
case Success(_) => return dir
case _ =>
}
}
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
MAX_DIR_CREATION_ATTEMPTS + " attempts!")
}

/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "kyuubi"): File = {
val dir = createDirectory(root, namePrefix)
dir.deleteOnExit()
dir
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.kyuubi.Utils

case class KyuubiConf(loadSysDefault: Boolean = true) {
private val settings = new ConcurrentHashMap[String, String]()
private lazy val reader: ConfigProvider = new ConfigProvider(settings)

if (loadSysDefault) {
loadSysProps()
Expand All @@ -35,13 +36,22 @@ case class KyuubiConf(loadSysDefault: Boolean = true) {
this
}

def set[T](entry: ConfigEntry[T], value: T): KyuubiConf = {
settings.put(entry.key, entry.strConverter(value))
this
}

def set(key: String, value: String): KyuubiConf = {
require(key != null)
require(value != null)
settings.put(key, value)
this
}

def get[T](config: ConfigEntry[T]): T = {
config.readFrom(reader)
}

}

object KyuubiConf {
Expand All @@ -51,4 +61,27 @@ object KyuubiConf {
/** the default file that contains kyuubi properties */
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"

def buildConf(key: String): ConfigBuilder = ConfigBuilder(KYUUBI_PREFIX + key)

val EMBEDDED_ZK_PORT: ConfigEntry[Int] =
buildConf("embedded.zk.port")
.doc("The port of the embedded zookeeper server")
.version("1.0.0")
.intConf.checkValue(_ >= 0, s"The value of $EMBEDDED_ZK_PORT must be >= 0")
.createWithDefault(2181)

val EMBEDDED_ZK_TEMP_DIR: ConfigEntry[String] =
buildConf("embedded.zk.directory")
.doc("The temporary directory for the embedded zookeeper server")
.version("1.0.0")
.stringConf
.createWithDefault(Utils.resolveURI("embedded_zookeeper").getRawPath)

val HA_ZK_QUORUM: OptionalConfigEntry[Seq[String]] =
buildConf("ha.zk.quorum")
.version("1.0.0")
.stringConf
.toSequence
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import org.apache.kyuubi.config.KyuubiConf

abstract class AbstractService(serviceName: String) extends Service with Logging {
import ServiceState._
private var conf: KyuubiConf = _
private var state: ServiceState = NEW
private var startTime: Long = _
protected var conf: KyuubiConf = _
protected var state: ServiceState = LATENT
protected var startTime: Long = _

/**
* Initialize the service.
*
* The transition must be from [[NEW]]to [[INITIALIZED]] unless the
* The transition must be from [[LATENT]]to [[INITIALIZED]] unless the
* operation failed and an exception was raised.
*
* @param conf the configuration of the service
*/
override def initialize(conf: KyuubiConf): Unit = {
ensureCurrentState(NEW)
ensureCurrentState(LATENT)
this.conf = conf
changeState(INITIALIZED)
info(s"Service[$serviceName] is initialized.")
Expand All @@ -62,7 +62,7 @@ abstract class AbstractService(serviceName: String) extends Service with Logging
*/
override def stop(): Unit = {
state match {
case NEW | INITIALIZED | STOPPED =>
case LATENT | INITIALIZED | STOPPED =>
case _ =>
ensureCurrentState(STARTED)
changeState(STOPPED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait Service {
/**
* Initialize the service.
*
* The transition must be from [[NEW]]to [[INITIALIZED]] unless the
* The transition must be from [[LATENT]]to [[INITIALIZED]] unless the
* operation failed and an exception was raised.
*
* @param conf the configuration of the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[kyuubi] object ServiceState extends Enumeration {
type ServiceState = Value

val
/** Constructed but not initialized */ NEW,
/** Constructed but not initialized */ LATENT,
/** Initialized but not started or stopped */ INITIALIZED,
/** Started and not stopped */ STARTED,
/** Stopped. No further state transitions are permitted */ STOPPED = Value
Expand Down
36 changes: 36 additions & 0 deletions kyuubi-common/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA

#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN


#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n

# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG
21 changes: 21 additions & 0 deletions kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,25 @@ class UtilsSuite extends KyuubiFunSuite {
assert(props("kyuubi.yes") === "yes")
assert(!props.contains("kyuubi.no"))
}

test("resolveURI") {
def assertResolves(before: String, after: String): Unit = {
// This should test only single paths
assert(before.split(",").length === 1)
def resolve(uri: String): String = Utils.resolveURI(uri).toString
assert(resolve(before) === after)
assert(resolve(after) === after)
// Repeated invocations of resolveURI should yield the same result
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
}
assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:///root/spark.jar#app.jar")
assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt")
assertResolves("file:///C:/path/to/file.txt", "file:///C:/path/to/file.txt")
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt")
assertResolves("file:foo", "file:foo")
assertResolves("file:foo:baby", "file:foo:baby")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.config

import org.apache.kyuubi.KyuubiFunSuite

class KyuubiConfSuite extends KyuubiFunSuite {

import KyuubiConf._

test("kyuubi conf defaults") {
val conf = new KyuubiConf()
assert(conf.get(EMBEDDED_ZK_PORT) === 2181)
assert(conf.get(EMBEDDED_ZK_TEMP_DIR).endsWith("embedded_zookeeper"))
assert(conf.get(HA_ZK_QUORUM) === None)
}
}
59 changes: 59 additions & 0 deletions kyuubi-ha/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kyuubi-ha</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Project High Availability</name>

<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

</project>

0 comments on commit 5100d8a

Please sign in to comment.