17
17
18
18
package org .apache .kyuubi .ha .client
19
19
20
- import java .io .IOException
21
20
import java .nio .charset .StandardCharsets
22
- import java .util .concurrent .TimeUnit
23
- import java .util .concurrent .atomic .AtomicBoolean
24
21
25
22
import scala .collection .JavaConverters ._
26
23
27
24
import com .google .common .annotations .VisibleForTesting
28
25
import org .apache .curator .framework .CuratorFramework
29
- import org .apache .curator .framework .recipes .nodes .PersistentNode
30
- import org .apache .curator .framework .state .{ConnectionState , ConnectionStateListener }
31
- import org .apache .curator .framework .state .ConnectionState .{CONNECTED , LOST , RECONNECTED }
32
26
import org .apache .curator .utils .ZKPaths
33
- import org .apache .zookeeper .{CreateMode , KeeperException , WatchedEvent , Watcher }
34
- import org .apache .zookeeper .CreateMode .PERSISTENT
35
- import org .apache .zookeeper .KeeperException .NodeExistsException
36
27
37
- import org .apache .kyuubi .{ KYUUBI_VERSION , KyuubiException , Logging }
28
+ import org .apache .kyuubi .Logging
38
29
import org .apache .kyuubi .config .KyuubiConf
39
30
import org .apache .kyuubi .ha .HighAvailabilityConf ._
31
+ import org .apache .kyuubi .ha .client .zookeeper .ServiceDiscoveryClient
32
+ import org .apache .kyuubi .ha .client .zookeeper .ServiceDiscoveryClient .createServiceNode
40
33
import org .apache .kyuubi .service .{AbstractService , FrontendService }
41
- import org .apache .kyuubi .util .{KyuubiHadoopUtils , ThreadUtils }
42
34
43
35
/**
44
36
* A abstract service for service discovery
@@ -48,92 +40,31 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
48
40
*/
49
41
abstract class ServiceDiscovery (
50
42
name : String ,
51
- fe : FrontendService ) extends AbstractService (name) {
43
+ val fe : FrontendService ) extends AbstractService (name) {
52
44
53
- import ServiceDiscovery ._
54
- import ZooKeeperClientProvider ._
45
+ private var _discoveryClient : ServiceDiscoveryClient = _
55
46
56
- private var _zkClient : CuratorFramework = _
57
- private var _serviceNode : PersistentNode = _
58
-
59
- /**
60
- * a pre-defined namespace used to publish the instance of the associate service
61
- */
62
- private var _namespace : String = _
63
-
64
- def zkClient : CuratorFramework = _zkClient
65
-
66
- def serviceNode : PersistentNode = _serviceNode
67
-
68
- def namespace : String = _namespace
47
+ def discoveryClient : ServiceDiscoveryClient = _discoveryClient
69
48
70
49
override def initialize (conf : KyuubiConf ): Unit = {
71
50
this .conf = conf
72
- _namespace = conf.get(HA_ZK_NAMESPACE )
73
- val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT )
74
- val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES )
75
- _zkClient = buildZookeeperClient(conf)
76
- zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
77
- private val isConnected = new AtomicBoolean (false )
78
51
79
- override def stateChanged (client : CuratorFramework , newState : ConnectionState ): Unit = {
80
- info(s " Zookeeper client connection state changed to: $newState" )
81
- newState match {
82
- case CONNECTED | RECONNECTED => isConnected.set(true )
83
- case LOST =>
84
- isConnected.set(false )
85
- val delay = maxRetries.toLong * maxSleepTime
86
- connectionChecker.schedule(
87
- new Runnable {
88
- override def run (): Unit = if (! isConnected.get()) {
89
- error(s " Zookeeper client connection state changed to: $newState, but failed to " +
90
- s " reconnect in ${delay / 1000 } seconds. Give up retry. " )
91
- stopGracefully()
92
- }
93
- },
94
- delay,
95
- TimeUnit .MILLISECONDS )
96
- case _ =>
97
- }
98
- }
99
- })
100
- zkClient.start()
52
+ _discoveryClient = new ServiceDiscoveryClient (this )
53
+ discoveryClient.createClient(conf)
54
+
101
55
super .initialize(conf)
102
56
}
103
57
104
58
override def start (): Unit = {
105
- val instance = fe.connectionUrl
106
- _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
107
- // Set a watch on the serviceNode
108
- val watcher = new DeRegisterWatcher
109
- if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null ) {
110
- // No node exists, throw exception
111
- throw new KyuubiException (s " Unable to create znode for this Kyuubi " +
112
- s " instance[ ${fe.connectionUrl}] on ZooKeeper. " )
113
- }
59
+ discoveryClient.registerService(conf)
114
60
super .start()
115
61
}
116
62
117
63
override def stop (): Unit = {
118
- closeServiceNode()
119
- if (zkClient != null ) zkClient.close()
64
+ discoveryClient.closeClient()
120
65
super .stop()
121
66
}
122
67
123
- // close the EPHEMERAL_SEQUENTIAL node in zk
124
- protected def closeServiceNode (): Unit = {
125
- if (_serviceNode != null ) {
126
- try {
127
- _serviceNode.close()
128
- } catch {
129
- case e : IOException =>
130
- error(" Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
131
- } finally {
132
- _serviceNode = null
133
- }
134
- }
135
- }
136
-
137
68
// stop the server genteelly
138
69
def stopGracefully (): Unit = {
139
70
stop()
@@ -143,23 +74,10 @@ abstract class ServiceDiscovery(
143
74
fe.serverable.stop()
144
75
}
145
76
146
- class DeRegisterWatcher extends Watcher {
147
- override def process (event : WatchedEvent ): Unit = {
148
- if (event.getType == Watcher .Event .EventType .NodeDeleted ) {
149
- warn(s " This Kyuubi instance ${fe.connectionUrl} is now de-registered from " +
150
- s " ZooKeeper. The server will be shut down after the last client session completes. " )
151
- stopGracefully()
152
- }
153
- }
154
- }
155
-
156
77
}
157
78
158
79
object ServiceDiscovery extends Logging {
159
80
160
- final private lazy val connectionChecker =
161
- ThreadUtils .newDaemonSingleThreadScheduledExecutor(" zk-connection-checker" )
162
-
163
81
def supportServiceDiscovery (conf : KyuubiConf ): Boolean = {
164
82
val zkEnsemble = conf.get(HA_ZK_QUORUM )
165
83
zkEnsemble != null && zkEnsemble.nonEmpty
@@ -234,97 +152,6 @@ object ServiceDiscovery extends Logging {
234
152
external : Boolean = false ): String = {
235
153
createServiceNode(conf, zkClient, namespace, instance, version, external).getActualPath
236
154
}
237
-
238
- private def createServiceNode (
239
- conf : KyuubiConf ,
240
- zkClient : CuratorFramework ,
241
- namespace : String ,
242
- instance : String ,
243
- version : Option [String ] = None ,
244
- external : Boolean = false ): PersistentNode = {
245
- val ns = ZKPaths .makePath(null , namespace)
246
- try {
247
- zkClient
248
- .create()
249
- .creatingParentsIfNeeded()
250
- .withMode(PERSISTENT )
251
- .forPath(ns)
252
- } catch {
253
- case _ : NodeExistsException => // do nothing
254
- case e : KeeperException =>
255
- throw new KyuubiException (s " Failed to create namespace ' $ns' " , e)
256
- }
257
-
258
- val session = conf.get(HA_ZK_ENGINE_REF_ID )
259
- .map(refId => s " refId= $refId; " ).getOrElse(" " )
260
- val pathPrefix = ZKPaths .makePath(
261
- namespace,
262
- s " serviceUri= $instance;version= ${version.getOrElse(KYUUBI_VERSION )}; ${session}sequence= " )
263
- var serviceNode : PersistentNode = null
264
- val createMode =
265
- if (external) CreateMode .PERSISTENT_SEQUENTIAL
266
- else CreateMode .EPHEMERAL_SEQUENTIAL
267
- val znodeData =
268
- if (conf.get(HA_ZK_PUBLIST_CONFIGS ) && session.isEmpty) {
269
- addConfsToPublish(conf, instance)
270
- } else {
271
- instance
272
- }
273
- try {
274
- serviceNode = new PersistentNode (
275
- zkClient,
276
- createMode,
277
- false ,
278
- pathPrefix,
279
- znodeData.getBytes(StandardCharsets .UTF_8 ))
280
- serviceNode.start()
281
- val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT )
282
- if (! serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit .MILLISECONDS )) {
283
- throw new KyuubiException (s " Max znode creation wait time $znodeTimeout s exhausted " )
284
- }
285
- info(s " Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
286
- } catch {
287
- case e : Exception =>
288
- if (serviceNode != null ) {
289
- serviceNode.close()
290
- }
291
- throw new KyuubiException (
292
- s " Unable to create a znode for this server instance: $instance" ,
293
- e)
294
- }
295
- serviceNode
296
- }
297
-
298
- /**
299
- * Refer to the implementation of HIVE-11581 to simplify user connection parameters.
300
- * https://issues.apache.org/jira/browse/HIVE-11581
301
- * HiveServer2 should store connection params in ZK
302
- * when using dynamic service discovery for simpler client connection string.
303
- */
304
- private def addConfsToPublish (conf : KyuubiConf , instance : String ): String = {
305
- if (! instance.contains(" :" )) {
306
- return instance
307
- }
308
- val hostPort = instance.split(" :" , 2 )
309
- val confsToPublish = collection.mutable.Map [String , String ]()
310
-
311
- // Hostname
312
- confsToPublish += (" hive.server2.thrift.bind.host" -> hostPort(0 ))
313
- // Transport mode
314
- confsToPublish += (" hive.server2.transport.mode" -> " binary" )
315
- // Transport specific confs
316
- confsToPublish += (" hive.server2.thrift.port" -> hostPort(1 ))
317
- confsToPublish += (" hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf .SASL_QOP ))
318
- // Auth specific confs
319
- val authenticationMethod = conf.get(KyuubiConf .AUTHENTICATION_METHOD ).mkString(" ," )
320
- confsToPublish += (" hive.server2.authentication" -> authenticationMethod)
321
- if (authenticationMethod.equalsIgnoreCase(" KERBEROS" )) {
322
- confsToPublish += (" hive.server2.authentication.kerberos.principal" ->
323
- conf.get(KyuubiConf .SERVER_PRINCIPAL ).map(KyuubiHadoopUtils .getServerPrincipal)
324
- .getOrElse(" " ))
325
- }
326
- confsToPublish.map { case (k, v) => k + " =" + v }.mkString(" ;" )
327
- }
328
155
}
329
156
330
157
case class ServiceNodeInfo (
0 commit comments