From 477b438ff13eef4f73ba7a7c533ae7a963262b39 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Fri, 23 Aug 2019 17:25:11 -0700 Subject: [PATCH 01/28] Added support for Active/Passive LivyHA --- conf/livy.conf.template | 16 ++- .../main/scala/org/apache/livy/LivyConf.scala | 16 ++- .../livy/server/CuratorElectorService.scala | 110 ++++++++++++++++++ .../org/apache/livy/server/LivyServer.scala | 30 +++-- 4 files changed, 157 insertions(+), 15 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala diff --git a/conf/livy.conf.template b/conf/livy.conf.template index de7c24823..ce75d2800 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -50,12 +50,8 @@ # Enabled to check whether timeout Livy sessions should be stopped. # livy.server.session.timeout-check = true -# -# Whether or not to skip timeout check for a busy session -# livy.server.session.timeout-check.skip-busy = false -# Time in milliseconds on how long Livy will wait before timing out an inactive session. -# Note that the inactive session could be busy running jobs. +# Time in milliseconds on how long Livy will wait before timing out an idle session. # livy.server.session.timeout = 1h # # How long a finished session state should be kept in LivyServer for query. @@ -100,6 +96,16 @@ # on user request and then livy server classpath automatically. # livy.repl.enable-hive-context = +# High Availability mode of Livy. Possible values: +# off: Default. Turn off High Availability. +# on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the +# correct state. +# Must set livy.server.ha.zookeeper-url to configure HA +# livy.server.ha.mode = off + +# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 +# livy.server.ha.zookeeper-url = + # Recovery mode of Livy. Possible values: # off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. # recovery: Livy persists session info to the state store. When Livy restarts, it recovers diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index dec8e4aff..e73c1098c 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -184,6 +184,19 @@ object LivyConf { * zookeeper: Store state in a Zookeeper instance. */ val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null) + + /** + * High Availability mode of Livy. Possible values: + * off: Default. Turn off High Availability. + * on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the + * correct state. + * Must set livy.server.ha.zookeeper-url to configure HA + */ + val HA_MODE = Entry("livy.server.ha.mode", "off") + + // For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 + val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "") + /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. @@ -216,9 +229,6 @@ object LivyConf { // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) - // Whether session timeout check should skip busy sessions, if set to true, then busy sessions - // that have jobs running will never timeout. - val SESSION_TIMEOUT_CHECK_SKIP_BUSY = Entry("livy.server.session.timeout-check.skip-busy", false) // How long will an inactive session be gc-ed. val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h") // How long a finished session state will be kept in memory diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala new file mode 100644 index 000000000..fb1b8d7cd --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -0,0 +1,110 @@ +package org.apache.livy.server + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.LeaderLatch; + +//import org.apache.curator.x.async.AsyncCuratorFramework; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes + +import org.apache.livy.{LivyConf, Logging} +import org.apache.livy.LivyConf.Entry + +object CuratorElectorService { + val HA_KEY_PREFIX_CONF = Entry("livy.server.ha.key-prefix", "livy_ha") + val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100") +} + +class CuratorElectorService(livyConf:LivyConf, livyServer: LivyServer) extends LeaderLatchListener { + + import CuratorElectorService._ + + val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL) + require(!haAddress.isEmpty, s"Please config ${LivyConf.HA_ZOOKEEPER_URL.key}.") + val haKeyPrefix = livyConf.get(HA_KEY_PREFIX_CONF) + val retryValue = livyConf.get(HA_RETRY_CONF) + // a regex to match patterns like "m, n" where m and n both are integer values + val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r + val retryPolicy = retryValue match { + case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt) + case _ => throw new IllegalArgumentException( + s"$HA_KEY_PREFIX_CONF contains bad value: $retryValue. " + + "Correct format is ,. e.g. 5,100") + } + + val client: CuratorFramework = CuratorFrameworkFactory.newClient(haAddress, retryPolicy) + val leaderKey = "/$haKeyPrefix/leader" + + var server : LivyServer = livyServer + + var leaderLatch = new LeaderLatch(client, leaderKey) + leaderLatch.addListener(this) + + object HAState extends Enumeration{ + type HAState = Value + val Active, Standby = Value + } + var currentState = HAState.Standby + + def isLeader() { + transitionToActive(); + } + + def notLeader(){ + transitionToStandby(); + } + + def start():Unit = { + transitionToStandby(); + + client.start() + leaderLatch.start() + + leaderLatch.await() + //We are the leader now join the webserver to the main thread + System.out.println("starting join") + server.join() + System.out.println("join completed?") + + close() + } + + + + def close():Unit = { + transitionToStandby(); + leaderLatch.close(); + } + + def transitionToActive():Unit = { + System.out.println("Transitioning to Active state") + if(currentState == HAState.Active){ + System.out.println("Already in Active State"); + } + else{ + server.start() + currentState = HAState.Active + System.out.println("Transition complete"); + } + } + + def transitionToStandby():Unit = { + System.out.println("Transitioning to Standby state") + if(currentState == HAState.Standby){ + System.out.println("Already in Standby State"); + } + else { + server.stop(); + currentState = HAState.Standby + System.out.println("Transition complete"); + } + } +} \ No newline at end of file diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 60f3961dc..01ebaace7 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -38,13 +38,15 @@ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} import org.apache.livy._ import org.apache.livy.server.batch.BatchSessionServlet import org.apache.livy.server.interactive.InteractiveSessionServlet -import org.apache.livy.server.recovery.{SessionStore, StateStore} +import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperStateStore} import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF import org.apache.livy.utils.LivySparkUtils._ import org.apache.livy.utils.SparkYarnApp +import org.apache.curator.utils.CloseableUtils; + class LivyServer extends Logging { import LivyConf._ @@ -394,16 +396,30 @@ class LivyServer extends Logging { } } +object HighAvailabilitySettings { + val HA_ON = "on" + val HA_OFF = "off" +} + object LivyServer { def main(args: Array[String]): Unit = { val server = new LivyServer() - try { - server.start() - server.join() - } finally { - server.stop() + val livyConf = new LivyConf().loadFromFile("livy.conf") + + //Test code for Livy HA implementation + if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){ + info("Starting HA connection") + val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server); + electorService.start() + } + else { + try { + server.start() + server.join() + } finally { + server.stop() + } } } - } From 1a2bd914e011a083754c2209051697fa2203ffbb Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Mon, 26 Aug 2019 00:36:21 -0700 Subject: [PATCH 02/28] Fixed error with the conf --- conf/livy.conf.template | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index de7c24823..b851da353 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -50,12 +50,14 @@ # Enabled to check whether timeout Livy sessions should be stopped. # livy.server.session.timeout-check = true + # # Whether or not to skip timeout check for a busy session # livy.server.session.timeout-check.skip-busy = false # Time in milliseconds on how long Livy will wait before timing out an inactive session. # Note that the inactive session could be busy running jobs. +# Time in milliseconds on how long Livy will wait before timing out an idle session. # livy.server.session.timeout = 1h # # How long a finished session state should be kept in LivyServer for query. From 945f5ce3faa8d11a7bf9b1506d9d0a9ccf691723 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Mon, 26 Aug 2019 00:40:30 -0700 Subject: [PATCH 03/28] formatting fixes --- conf/livy.conf.template | 2 -- 1 file changed, 2 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 16e852b30..5d3b71554 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -50,14 +50,12 @@ # Enabled to check whether timeout Livy sessions should be stopped. # livy.server.session.timeout-check = true - # # Whether or not to skip timeout check for a busy session # livy.server.session.timeout-check.skip-busy = false # Time in milliseconds on how long Livy will wait before timing out an inactive session. # Note that the inactive session could be busy running jobs. -# Time in milliseconds on how long Livy will wait before timing out an idle session. # livy.server.session.timeout = 1h # # How long a finished session state should be kept in LivyServer for query. From de6f8856ac91db5a289a3aa12724934d262b1b55 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Mon, 26 Aug 2019 13:21:03 -0700 Subject: [PATCH 04/28] added back another missing configuration --- server/src/main/scala/org/apache/livy/LivyConf.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index e73c1098c..bdd18563f 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -229,6 +229,9 @@ object LivyConf { // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) + // Whether session timeout check should skip busy sessions, if set to true, then busy sessions + // that have jobs running will never timeout. + val SESSION_TIMEOUT_CHECK_SKIP_BUSY = Entry("livy.server.session.timeout-check.skip-busy", false) // How long will an inactive session be gc-ed. val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h") // How long a finished session state will be kept in memory From 048c1a2fcf1475097f28e84c39ce0bc4350b7fd7 Mon Sep 17 00:00:00 2001 From: roliu Date: Mon, 26 Aug 2019 17:56:04 -0700 Subject: [PATCH 05/28] fixed style errors --- .../main/scala/org/apache/livy/LivyConf.scala | 2 +- .../livy/server/CuratorElectorService.scala | 81 +++++++++++-------- .../org/apache/livy/server/LivyServer.scala | 8 +- 3 files changed, 51 insertions(+), 40 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index bdd18563f..6b2d041c6 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -187,7 +187,7 @@ object LivyConf { /** * High Availability mode of Livy. Possible values: - * off: Default. Turn off High Availability. + * off: Default. Turn off High Availability. * on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the * correct state. * Must set livy.server.ha.zookeeper-url to configure HA diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index fb1b8d7cd..97cf46a27 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -1,18 +1,31 @@ -package org.apache.livy.server - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderLatchListener; -import org.apache.curator.framework.recipes.leader.LeaderLatch; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -//import org.apache.curator.x.async.AsyncCuratorFramework; +package org.apache.livy.server -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.framework.recipes.leader.LeaderLatch +import org.apache.curator.framework.recipes.leader.LeaderLatchListener import org.apache.curator.retry.RetryNTimes import org.apache.livy.{LivyConf, Logging} @@ -23,7 +36,10 @@ object CuratorElectorService { val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100") } -class CuratorElectorService(livyConf:LivyConf, livyServer: LivyServer) extends LeaderLatchListener { +class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) + extends LeaderLatchListener + with Logging +{ import CuratorElectorService._ @@ -57,54 +73,51 @@ class CuratorElectorService(livyConf:LivyConf, livyServer: LivyServer) extends L def isLeader() { transitionToActive(); } - + def notLeader(){ transitionToStandby(); } - def start():Unit = { - transitionToStandby(); + def start() : Unit = { + transitionToStandby() client.start() leaderLatch.start() - leaderLatch.await() - //We are the leader now join the webserver to the main thread - System.out.println("starting join") - server.join() - System.out.println("join completed?") + // This instance is now the leader. Joining the webserver to the main thread + info("starting join") + server.join() + info("join completed?") close() } - - - def close():Unit = { + def close() : Unit = { transitionToStandby(); leaderLatch.close(); } - def transitionToActive():Unit = { - System.out.println("Transitioning to Active state") + def transitionToActive() : Unit = { + info("Transitioning to Active state") if(currentState == HAState.Active){ - System.out.println("Already in Active State"); + info("Already in Active State") } - else{ + else { server.start() currentState = HAState.Active - System.out.println("Transition complete"); + info("Transition complete") } } - def transitionToStandby():Unit = { - System.out.println("Transitioning to Standby state") + def transitionToStandby() : Unit = { + info("Transitioning to Standby state") if(currentState == HAState.Standby){ - System.out.println("Already in Standby State"); + info("Already in Standby State"); } else { server.stop(); currentState = HAState.Standby - System.out.println("Transition complete"); + info("Transition complete"); } } -} \ No newline at end of file +} diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 01ebaace7..40ece22f5 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import org.apache.curator.utils.CloseableUtils import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.authentication.server._ import org.eclipse.jetty.servlet.FilterHolder @@ -45,8 +46,6 @@ import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF import org.apache.livy.utils.LivySparkUtils._ import org.apache.livy.utils.SparkYarnApp -import org.apache.curator.utils.CloseableUtils; - class LivyServer extends Logging { import LivyConf._ @@ -407,11 +406,10 @@ object LivyServer { val server = new LivyServer() val livyConf = new LivyConf().loadFromFile("livy.conf") - //Test code for Livy HA implementation if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){ info("Starting HA connection") - val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server); - electorService.start() + val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server) + electorService.start() } else { try { From 31df79a262bbd8347ae3606e4e14c1f5bb23db1b Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 27 Aug 2019 12:42:17 -0700 Subject: [PATCH 06/28] spelling comment resolution --- .../scala/org/apache/livy/server/CuratorElectorService.scala | 2 +- .../org/apache/livy/server/recovery/ZooKeeperStateStore.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 97cf46a27..767a1dfe1 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -44,7 +44,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) import CuratorElectorService._ val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL) - require(!haAddress.isEmpty, s"Please config ${LivyConf.HA_ZOOKEEPER_URL.key}.") + require(!haAddress.isEmpty, s"Please configure ${LivyConf.HA_ZOOKEEPER_URL.key}.") val haKeyPrefix = livyConf.get(HA_KEY_PREFIX_CONF) val retryValue = livyConf.get(HA_RETRY_CONF) // a regex to match patterns like "m, n" where m and n both are integer values diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index ec6b9df18..6affae462 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -46,7 +46,7 @@ class ZooKeeperStateStore( } private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) - require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") + require(!zkAddress.isEmpty, s"Please configure ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) private val retryValue = livyConf.get(ZK_RETRY_CONF) // a regex to match patterns like "m, n" where m and n both are integer values From dd5da8b5d7249efb75fc5be5fb9cd2db4b3f12f8 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Wed, 28 Aug 2019 11:20:21 -0700 Subject: [PATCH 07/28] fixed string format error --- .../scala/org/apache/livy/server/CuratorElectorService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 767a1dfe1..0f3c308ed 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -57,7 +57,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) } val client: CuratorFramework = CuratorFrameworkFactory.newClient(haAddress, retryPolicy) - val leaderKey = "/$haKeyPrefix/leader" + val leaderKey = s"/$haKeyPrefix/leader" var server : LivyServer = livyServer From c73bdea2359a3cb2ea501d92bf6d7aa6bba40304 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Thu, 5 Sep 2019 14:49:48 -0700 Subject: [PATCH 08/28] reverted spelling change and addressed threading code comment --- .../apache/livy/server/CuratorElectorService.scala | 11 +++++------ .../livy/server/recovery/ZooKeeperStateStore.scala | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 0f3c308ed..8ebc16b8d 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -83,13 +83,12 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) client.start() leaderLatch.start() - leaderLatch.await() - // This instance is now the leader. Joining the webserver to the main thread - info("starting join") - server.join() - info("join completed?") - close() + try { + Thread.currentThread.join() + }finally { + transitionToStandby() + } } def close() : Unit = { diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala index 6affae462..ec6b9df18 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala @@ -46,7 +46,7 @@ class ZooKeeperStateStore( } private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) - require(!zkAddress.isEmpty, s"Please configure ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") + require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.") private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF) private val retryValue = livyConf.get(ZK_RETRY_CONF) // a regex to match patterns like "m, n" where m and n both are integer values From b8a4d1732afe16c6a083ebf52dfa00008d497304 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Thu, 5 Sep 2019 14:54:35 -0700 Subject: [PATCH 09/28] style fix --- .../scala/org/apache/livy/server/CuratorElectorService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 8ebc16b8d..c3ec0e58c 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -86,7 +86,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) try { Thread.currentThread.join() - }finally { + } finally { transitionToStandby() } } From af0cf82c3f24fa54617fa0db724eb782405a4a7f Mon Sep 17 00:00:00 2001 From: roliu Date: Thu, 19 Sep 2019 14:44:48 -0700 Subject: [PATCH 10/28] style fixes --- .../apache/livy/server/CuratorElectorService.scala | 14 +++++++------- .../scala/org/apache/livy/server/LivyServer.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index c3ec0e58c..c7f15b74f 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -74,11 +74,11 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) transitionToActive(); } - def notLeader(){ + def notLeader() { transitionToStandby(); } - def start() : Unit = { + def start(): Unit = { transitionToStandby() client.start() @@ -91,14 +91,14 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) } } - def close() : Unit = { + def close(): Unit = { transitionToStandby(); leaderLatch.close(); } - def transitionToActive() : Unit = { + def transitionToActive(): Unit = { info("Transitioning to Active state") - if(currentState == HAState.Active){ + if(currentState == HAState.Active) { info("Already in Active State") } else { @@ -108,9 +108,9 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) } } - def transitionToStandby() : Unit = { + def transitionToStandby(): Unit = { info("Transitioning to Standby state") - if(currentState == HAState.Standby){ + if(currentState == HAState.Standby) { info("Already in Standby State"); } else { diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 40ece22f5..f856eb9ec 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -406,7 +406,7 @@ object LivyServer { val server = new LivyServer() val livyConf = new LivyConf().loadFromFile("livy.conf") - if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){ + if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { info("Starting HA connection") val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server) electorService.start() From 19e266bb06bdb460d0037b182e4eebc7985fdaf4 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 10 Mar 2020 16:43:46 -0700 Subject: [PATCH 11/28] added a unit test spec suite for the curator elector service --- .../livy/server/CuratorElectorService.scala | 43 +++++---- .../recovery/CuratorElectorServiceSpec.scala | 90 +++++++++++++++++++ 2 files changed, 115 insertions(+), 18 deletions(-) create mode 100644 server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index c7f15b74f..24f01041f 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -36,12 +36,20 @@ object CuratorElectorService { val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100") } -class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) +object HAState extends Enumeration{ + type HAState = Value + val Active, Standby = Value +} + + +class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, + mockCuratorClient: Option[CuratorFramework] = None, mockLeaderLatch: Option[LeaderLatch] = None) // For testing) extends LeaderLatchListener with Logging { import CuratorElectorService._ + import HAState._ val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL) require(!haAddress.isEmpty, s"Please configure ${LivyConf.HA_ZOOKEEPER_URL.key}.") @@ -56,26 +64,25 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) "Correct format is ,. e.g. 5,100") } - val client: CuratorFramework = CuratorFrameworkFactory.newClient(haAddress, retryPolicy) - val leaderKey = s"/$haKeyPrefix/leader" - var server : LivyServer = livyServer - var leaderLatch = new LeaderLatch(client, leaderKey) - leaderLatch.addListener(this) + val client: CuratorFramework = mockCuratorClient.getOrElse { + CuratorFrameworkFactory.newClient(haAddress, retryPolicy) + } + val leaderKey = s"/$haKeyPrefix/leader" - object HAState extends Enumeration{ - type HAState = Value - val Active, Standby = Value + var leaderLatch = mockLeaderLatch.getOrElse { + new LeaderLatch(client, leaderKey) } - var currentState = HAState.Standby + leaderLatch.addListener(this) + var currentState = HAState.Standby def isLeader() { - transitionToActive(); + transitionToActive() } - def notLeader() { - transitionToStandby(); + def notLeader(){ + transitionToStandby() } def start(): Unit = { @@ -92,8 +99,8 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) } def close(): Unit = { - transitionToStandby(); - leaderLatch.close(); + transitionToStandby() + leaderLatch.close() } def transitionToActive(): Unit = { @@ -111,12 +118,12 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer) def transitionToStandby(): Unit = { info("Transitioning to Standby state") if(currentState == HAState.Standby) { - info("Already in Standby State"); + info("Already in Standby State") } else { - server.stop(); + server.stop() currentState = HAState.Standby - info("Transition complete"); + info("Transition complete") } } } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala new file mode 100644 index 000000000..ff2d75a2c --- /dev/null +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.server.recovery + +import scala.collection.JavaConverters._ + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.api._ +import org.apache.curator.framework.listen.Listenable +import org.apache.curator.framework.recipes.leader.LeaderLatch +import org.apache.zookeeper.data.Stat +import org.mockito.Mockito._ +import org.scalatest.FunSpec +import org.scalatest.Matchers._ +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.server.CuratorElectorService +import org.apache.livy.server.HAState +import org.apache.livy.server.LivyServer + +class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { + describe("CuratorElectorService") { + case class TestFixture(electorService: CuratorElectorService) + val conf = new LivyConf() + conf.set(LivyConf.HA_ZOOKEEPER_URL, "host") + + //Need to create mock leader latches and their associated functions + def withMock[R](testBody: TestFixture => R): R = { + val curatorClient = mock[CuratorFramework] + when(curatorClient.getUnhandledErrorListenable()) + .thenReturn(mock[Listenable[UnhandledErrorListener]]) + val leaderLatch = mock[LeaderLatch] + + val server = mock[LivyServer] + val electorService = new CuratorElectorService(conf, server, Some(curatorClient), Some(leaderLatch)) + + testBody(TestFixture(electorService)) + } + + it("should not start the server until it acquires leadership") { + withMock { f => + f.electorService.currentState shouldBe HAState.Standby + verify(f.electorService.server, times(0)).start() + } + } + + it("should start the livy server after acquiring leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.currentState shouldBe HAState.Active + verify(f.electorService.server, times(1)).start() + } + } + + it("should stop the Livy Server if it loses leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.notLeader() + f.electorService.currentState shouldBe HAState.Standby + verify(f.electorService.server, times(1)).stop() + } + } + + it("should start a new Livy Server after reacquiring leadership") { + withMock { f => + f.electorService.isLeader() + f.electorService.notLeader() + f.electorService.isLeader() + f.electorService.currentState shouldBe HAState.Active + verify(f.electorService.server, times(2)).start() + } + } + } +} From 972ca3d4e2806beae4f8a18d2eb159c81221120d Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 10 Mar 2020 17:04:26 -0700 Subject: [PATCH 12/28] retooled livy HA to also do domain redirection --- conf/livy.conf.template | 9 +++ server/pom.xml | 6 ++ .../main/scala/org/apache/livy/LivyConf.scala | 9 +++ .../livy/server/CuratorElectorService.scala | 25 ++++++-- .../livy/server/DomainRedirectionFilter.scala | 63 +++++++++++++++++++ .../org/apache/livy/server/LivyServer.scala | 54 +++++++++++++--- .../apache/livy/sessions/SessionManager.scala | 11 ++-- 7 files changed, 159 insertions(+), 18 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 585f674c7..9d5385801 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -110,6 +110,15 @@ # For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 # livy.server.ha.zookeeper-url = +# The id of the current livy server +# livy.server.ha.server-id = + +# The list of ids for all livy servers used for HA +# livy.server.ha.server-ids = + +# The list of server addresses for all livy servers used for HA +# livy.server.ha.server-addresses = + # Recovery mode of Livy. Possible values: # off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. # recovery: Livy persists session info to the state store. When Livy restarts, it recovers diff --git a/server/pom.xml b/server/pom.xml index bca185327..55603f0bd 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -251,6 +251,12 @@ test + + org.springframework + spring-web + 5.2.0.RELEASE + + org.scalatra scalatra-test_${scala.binary.version} diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 31b5d824a..737983b46 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -209,6 +209,15 @@ object LivyConf { // For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "") + // The id of the current livy server + val HA_SERVER_ID = Entry("livy.server.ha.server-id", "") + + // The ids of all servers used in HA + val HA_SERVER_IDS = Entry("livy.server.ha.server-ids", "") + + // The endpoints of all servers used in HA + val HA_SERVER_ENDPOINTS = Entry("livy.server.ha.server-addresses", "") + /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 24f01041f..d4f540534 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -19,6 +19,7 @@ package org.apache.livy.server import java.io.Closeable import java.io.IOException +import java.net.InetAddress import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -47,7 +48,6 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, extends LeaderLatchListener with Logging { - import CuratorElectorService._ import HAState._ @@ -71,8 +71,11 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, } val leaderKey = s"/$haKeyPrefix/leader" + val leaderIds = livyConf.configToSeq(LivyConf.HA_SERVER_IDS) + val leaderEndpoints = livyConf.configToSeq(LivyConf.HA_SERVER_ENDPOINTS) + var leaderLatch = mockLeaderLatch.getOrElse { - new LeaderLatch(client, leaderKey) + new LeaderLatch(client, leaderKey, getCurrentId()) } leaderLatch.addListener(this) @@ -85,9 +88,24 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, transitionToStandby() } + def getCurrentId(): String = { + val currentEndpoint = java.net.InetAddress.getLocalHost().getHostName(); + info(currentEndpoint) + info(leaderEndpoints) + val currentId = leaderIds(leaderEndpoints indexOf currentEndpoint) + currentId + } + + def getActiveEndpoint(): String = { + val activeLeaderId = leaderLatch.getLeader().getId() + val activeEndpoint = leaderEndpoints(leaderIds indexOf activeLeaderId) + activeEndpoint + } + def start(): Unit = { transitionToStandby() + server.start() client.start() leaderLatch.start() @@ -109,7 +127,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, info("Already in Active State") } else { - server.start() + server.restart() currentState = HAState.Active info("Transition complete") } @@ -121,7 +139,6 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, info("Already in Standby State") } else { - server.stop() currentState = HAState.Standby info("Transition complete") } diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala new file mode 100644 index 000000000..1c768a9af --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -0,0 +1,63 @@ +package org.apache.livy.server + +import java.io.IOException; +import java.net.URL; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletRequestWrapper; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.HttpServletResponse; + +import org.springframework.web.util.UriComponentsBuilder + +import org.apache.livy.{LivyConf, Logging} + +class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter + with Logging +{ + + val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD"); + + val HEADER_NAME = "X-Requested-By"; + + def isLeader(): Boolean = { + HAService.currentState == HAState.Active + } + + override def init(filterConfig: FilterConfig): Unit = {} + + override def doFilter(request: ServletRequest, + response: ServletResponse, + chain: FilterChain): Unit = { + info("active leader is:" + HAService.getActiveEndpoint()) + info("current id:" + HAService.getCurrentId()) + if (!isLeader()) { + val httpRequest = request.asInstanceOf[HttpServletRequest] + val requestURL = httpRequest.getRequestURL().toString() + info(requestURL) + + val builder = UriComponentsBuilder.fromHttpUrl(requestURL) + val redirectURL = builder.host(HAService.getActiveEndpoint()).toUriString(); + info(redirectURL) + + val httpServletResponse = response.asInstanceOf[HttpServletResponse]; + val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL + val out = httpServletResponse.getWriter(); + out.println(redirectMsg); + + httpServletResponse.setHeader("Location", redirectURL); + httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + } else { + chain.doFilter(request, response); + } + } + + override def destroy(): Unit = {} +} \ No newline at end of file diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 1716a5855..98445e57f 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -63,9 +63,13 @@ class LivyServer extends Logging { private var zkManager: Option[ZooKeeperManager] = None + private var interactiveSessionManager: InteractiveSessionManager = _ + private var batchSessionManager: BatchSessionManager = _ + private var sessionStore: SessionStore = _ + private var ugi: UserGroupInformation = _ - def start(): Unit = { + def init(): Unit = { livyConf = new LivyConf().loadFromFile("livy.conf") accessManager = new AccessManager(livyConf) @@ -153,11 +157,11 @@ class LivyServer extends Logging { zkManager = Some(new ZooKeeperManager(livyConf)) zkManager.foreach(_.start()) } - + StateStore.init(livyConf, zkManager) - val sessionStore = new SessionStore(livyConf) - val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) - val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) + sessionStore = new SessionStore(livyConf) + batchSessionManager = new BatchSessionManager(livyConf, sessionStore) + interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) server = new WebServer(livyConf, host, port) server.context.setResourceBase("src/main/org/apache/livy/server") @@ -321,7 +325,29 @@ class LivyServer extends Logging { val accessHolder = new FilterHolder(new AccessFilter(accessManager)) server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) } + } + + def initHa(electorService: CuratorElectorService): Unit = { + //Start server HA leader election service if applicable + if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){ + info("Starting HA connection") + //val thread = new Thread { + // override def run { + // electorService.start() + // } + //} + //thread.start + + val redirectHolder = new FilterHolder(new DomainRedirectionFilter(electorService)) + server.context.addFilter(redirectHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) + } + } + + def start(): Unit = { + info("Starting HA connection") + interactiveSessionManager.startSessionManager() + batchSessionManager.startSessionManager() server.start() _thriftServerFactory.foreach { @@ -330,10 +356,7 @@ class LivyServer extends Logging { Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") { override def run(): Unit = { - info("Shutting down Livy server.") - zkManager.foreach(_.stop()) - server.stop() - _thriftServerFactory.foreach(_.stop()) + stop() } }) @@ -392,10 +415,19 @@ class LivyServer extends Logging { def stop(): Unit = { if (server != null) { - server.stop() + info("Shutting down Livy server.") + zkManager.foreach(_.stop()) + server.stop() + _thriftServerFactory.foreach(_.stop()) } } + def restart(): Unit = + { + stop() + start() + } + def serverUrl(): String = { _serverUrl.getOrElse(throw new IllegalStateException("Server not yet started.")) } @@ -435,9 +467,11 @@ object LivyServer { val server = new LivyServer() val livyConf = new LivyConf().loadFromFile("livy.conf") + server.init() if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { info("Starting HA connection") val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server) + server.initHa(electorService) electorService.start() } else { diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index f2548ac00..f2734fd57 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -75,7 +75,6 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( protected[this] final val sessions = mutable.LinkedHashMap[Int, S]() private[this] final val sessionsByName = mutable.HashMap[String, S]() - private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK) private[this] final val sessionTimeoutCheckSkipBusy = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY) @@ -84,9 +83,15 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( private[this] final val sessionStateRetainedInSec = TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME)) - mockSessions.getOrElse(recover()).foreach(register) new GarbageCollector().start() + def startSessionManager(): Unit = { + idCounter.set(0) + sessions.clear() + sessionsByName.clear() + mockSessions.getOrElse(recover()).foreach(register) + } + def nextId(): Int = synchronized { val id = idCounter.getAndIncrement() sessionStore.saveNextSessionId(sessionType, idCounter.get()) @@ -209,7 +214,5 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( Thread.sleep(60 * 1000) } } - } - } From fdf449d0c7ad92aff2087dcff0b4e62454a3302f Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Wed, 11 Mar 2020 15:58:05 -0700 Subject: [PATCH 13/28] updated specs to account for the start()/init() split --- .../server/recovery/CuratorElectorServiceSpec.scala | 11 +++++------ .../org/apache/livy/sessions/SessionManagerSpec.scala | 4 ++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala index ff2d75a2c..a74b6e6a3 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -60,30 +60,29 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { } } - it("should start the livy server after acquiring leadership") { + it("should restart the livy server after acquiring leadership") { withMock { f => f.electorService.isLeader() f.electorService.currentState shouldBe HAState.Active - verify(f.electorService.server, times(1)).start() + verify(f.electorService.server, times(1)).restart() } } - it("should stop the Livy Server if it loses leadership") { + it("should be in standy state if loses leadership") { withMock { f => f.electorService.isLeader() f.electorService.notLeader() f.electorService.currentState shouldBe HAState.Standby - verify(f.electorService.server, times(1)).stop() } } - it("should start a new Livy Server after reacquiring leadership") { + it("should restart the Livy Server again after reacquiring leadership") { withMock { f => f.electorService.isLeader() f.electorService.notLeader() f.electorService.isLeader() f.electorService.currentState shouldBe HAState.Active - verify(f.electorService.server, times(2)).start() + verify(f.electorService.server, times(2)).restart() } } } diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 7f5e31e26..2c89cb2de 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -232,6 +232,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit .thenReturn(validMetadata ++ invalidMetadata) val sm = new BatchSessionManager(conf, sessionStore) + sm.startSessionManager() sm.nextId() shouldBe nextId validMetadata.foreach { m => sm.get(m.get.id) shouldBe defined @@ -248,6 +249,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.startSessionManager() sm.get(sessionId) shouldBe defined Await.ready(sm.delete(sessionId).get, 30 seconds) @@ -263,6 +265,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.startSessionManager() sm.get(sessionId) shouldBe defined sm.shutdown() @@ -278,6 +281,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) + sm.startSessionManager() sm.get(sessionId) shouldBe defined sm.shutdown() From 636ac05db6bdecd23ba00bea6663ffcc6b7e495d Mon Sep 17 00:00:00 2001 From: roliu Date: Wed, 11 Mar 2020 17:11:03 -0700 Subject: [PATCH 14/28] scalastyle fixes --- .../livy/server/CuratorElectorService.scala | 5 +- .../livy/server/DomainRedirectionFilter.scala | 74 ++++++++++++------- .../org/apache/livy/server/LivyServer.scala | 15 +--- .../recovery/CuratorElectorServiceSpec.scala | 11 +-- 4 files changed, 59 insertions(+), 46 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index d4f540534..56d7a442b 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -44,7 +44,8 @@ object HAState extends Enumeration{ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, - mockCuratorClient: Option[CuratorFramework] = None, mockLeaderLatch: Option[LeaderLatch] = None) // For testing) + mockCuratorClient: Option[CuratorFramework] = None, + mockLeaderLatch: Option[LeaderLatch] = None) extends LeaderLatchListener with Logging { @@ -95,7 +96,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, val currentId = leaderIds(leaderEndpoints indexOf currentEndpoint) currentId } - + def getActiveEndpoint(): String = { val activeLeaderId = leaderLatch.getLeader().getId() val activeEndpoint = leaderEndpoints(leaderIds indexOf activeLeaderId) diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index 1c768a9af..82811517b 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -1,19 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.livy.server -import java.io.IOException; -import java.net.URL; - -import javax.servlet.Filter; -import javax.servlet.FilterChain; -import javax.servlet.FilterConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; -import javax.servlet.ServletRequestWrapper; -import javax.servlet.ServletResponse; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletRequestWrapper; -import javax.servlet.http.HttpServletResponse; +import java.io.IOException +import java.net.URL +import javax.servlet.Filter +import javax.servlet.FilterChain +import javax.servlet.FilterConfig +import javax.servlet.ServletContext +import javax.servlet.ServletException +import javax.servlet.ServletRequest +import javax.servlet.ServletRequestWrapper +import javax.servlet.ServletResponse +import javax.servlet.http.HttpServletRequest +import javax.servlet.http.HttpServletRequestWrapper +import javax.servlet.http.HttpServletResponse import org.springframework.web.util.UriComponentsBuilder @@ -23,9 +39,9 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter with Logging { - val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD"); + val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD") - val HEADER_NAME = "X-Requested-By"; + val HEADER_NAME = "X-Requested-By" def isLeader(): Boolean = { HAService.currentState == HAState.Active @@ -40,24 +56,26 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter info("current id:" + HAService.getCurrentId()) if (!isLeader()) { val httpRequest = request.asInstanceOf[HttpServletRequest] - val requestURL = httpRequest.getRequestURL().toString() - info(requestURL) + val requestURL = httpRequest.getRequestURL().toString() + info(requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) - val redirectURL = builder.host(HAService.getActiveEndpoint()).toUriString(); + val redirectURL = builder.host(HAService.getActiveEndpoint()).toUriString() info(redirectURL) - val httpServletResponse = response.asInstanceOf[HttpServletResponse]; - val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL - val out = httpServletResponse.getWriter(); - out.println(redirectMsg); + val httpServletResponse = response.asInstanceOf[HttpServletResponse]; + val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL + val out = httpServletResponse.getWriter() + // scalastyle:off println + out.println(redirectMsg) + // scalastyle:on println - httpServletResponse.setHeader("Location", redirectURL); - httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + httpServletResponse.setHeader("Location", redirectURL) + httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT) } else { - chain.doFilter(request, response); + chain.doFilter(request, response); } } - + override def destroy(): Unit = {} -} \ No newline at end of file +} diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 98445e57f..93341bb5f 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -18,7 +18,7 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} -import java.net.InetAddress +0import java.net.InetAddress import java.util.concurrent._ import java.util.EnumSet import javax.servlet._ @@ -157,7 +157,7 @@ class LivyServer extends Logging { zkManager = Some(new ZooKeeperManager(livyConf)) zkManager.foreach(_.start()) } - + StateStore.init(livyConf, zkManager) sessionStore = new SessionStore(livyConf) batchSessionManager = new BatchSessionManager(livyConf, sessionStore) @@ -328,17 +328,10 @@ class LivyServer extends Logging { } def initHa(electorService: CuratorElectorService): Unit = { - //Start server HA leader election service if applicable - if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){ + // Start server HA leader election service if applicable + if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { info("Starting HA connection") - //val thread = new Thread { - // override def run { - // electorService.start() - // } - //} - //thread.start - val redirectHolder = new FilterHolder(new DomainRedirectionFilter(electorService)) server.context.addFilter(redirectHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala index a74b6e6a3..8c90cb2ac 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -40,7 +40,7 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { val conf = new LivyConf() conf.set(LivyConf.HA_ZOOKEEPER_URL, "host") - //Need to create mock leader latches and their associated functions + // Need to create mock leader latches and their associated functions def withMock[R](testBody: TestFixture => R): R = { val curatorClient = mock[CuratorFramework] when(curatorClient.getUnhandledErrorListenable()) @@ -48,11 +48,12 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { val leaderLatch = mock[LeaderLatch] val server = mock[LivyServer] - val electorService = new CuratorElectorService(conf, server, Some(curatorClient), Some(leaderLatch)) - + val electorService = new CuratorElectorService(conf, server, + Some(curatorClient), Some(leaderLatch)) + testBody(TestFixture(electorService)) } - + it("should not start the server until it acquires leadership") { withMock { f => f.electorService.currentState shouldBe HAState.Standby @@ -82,7 +83,7 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { f.electorService.notLeader() f.electorService.isLeader() f.electorService.currentState shouldBe HAState.Active - verify(f.electorService.server, times(2)).restart() + verify(f.electorService.server, times(2)).restart() } } } From 2423e4d5388510306cbbba59aae689f9f843884c Mon Sep 17 00:00:00 2001 From: roliu Date: Wed, 11 Mar 2020 17:18:47 -0700 Subject: [PATCH 15/28] fixed typo --- server/src/main/scala/org/apache/livy/server/LivyServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 93341bb5f..5ea153da0 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -18,7 +18,7 @@ package org.apache.livy.server import java.io.{BufferedInputStream, InputStream} -0import java.net.InetAddress +import java.net.InetAddress import java.util.concurrent._ import java.util.EnumSet import javax.servlet._ From 14741ebad0cdc387aeeeaface8b030f97b72f667 Mon Sep 17 00:00:00 2001 From: roliu Date: Wed, 11 Mar 2020 17:57:44 -0700 Subject: [PATCH 16/28] fixed remaining scalastyle error --- server/src/main/scala/org/apache/livy/server/LivyServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 5ea153da0..ae9a51fba 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -329,7 +329,7 @@ class LivyServer extends Logging { def initHa(electorService: CuratorElectorService): Unit = { // Start server HA leader election service if applicable - if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { + if (livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { info("Starting HA connection") val redirectHolder = new FilterHolder(new DomainRedirectionFilter(electorService)) From de250f53efed80e6a45ef9b891c275961a21e348 Mon Sep 17 00:00:00 2001 From: roliu Date: Fri, 13 Mar 2020 14:40:11 -0700 Subject: [PATCH 17/28] updated minicluster.scala to reflect the init() and start() split --- .../main/scala/org/apache/livy/test/framework/MiniCluster.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala index 2827dceab..28c4dc103 100644 --- a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala +++ b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala @@ -154,6 +154,7 @@ object MiniLivyMain extends MiniClusterBase { saveProperties(livyConf, new File(configPath + "/livy.conf")) val server = new LivyServer() + server.init() server.start() server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, false) // Write a serverUrl.conf file to the conf directory with the location of the Livy From 988c2192703ed7520a1af93b20f61ebbb4c6d079 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 17 Mar 2020 14:25:24 -0700 Subject: [PATCH 18/28] refactored code to idenitfy the current server based on hostname and to redirect to the corresponding address --- conf/livy.conf.template | 6 +++--- .../main/scala/org/apache/livy/LivyConf.scala | 12 ++++++------ .../livy/server/CuratorElectorService.scala | 17 +++++++++-------- .../livy/server/DomainRedirectionFilter.scala | 6 +++--- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 9d5385801..663c88dc6 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -110,12 +110,12 @@ # For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 # livy.server.ha.zookeeper-url = -# The id of the current livy server -# livy.server.ha.server-id = - # The list of ids for all livy servers used for HA # livy.server.ha.server-ids = +# The list of hostnames for all livy servers used for HA +# livy.server.ha.server-hostnames = + # The list of server addresses for all livy servers used for HA # livy.server.ha.server-addresses = diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 737983b46..2d732e3d3 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -201,7 +201,7 @@ object LivyConf { * High Availability mode of Livy. Possible values: * off: Default. Turn off High Availability. * on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the - * correct state. +* correct state. * Must set livy.server.ha.zookeeper-url to configure HA */ val HA_MODE = Entry("livy.server.ha.mode", "off") @@ -209,14 +209,14 @@ object LivyConf { // For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "") - // The id of the current livy server - val HA_SERVER_ID = Entry("livy.server.ha.server-id", "") - // The ids of all servers used in HA val HA_SERVER_IDS = Entry("livy.server.ha.server-ids", "") - // The endpoints of all servers used in HA - val HA_SERVER_ENDPOINTS = Entry("livy.server.ha.server-addresses", "") + // The hostnames of all servers used in HA + val HA_SERVER_HOSTNAMES = Entry("livy.server.ha.server-hostnames", "") + + // The addresses of all servers used in HA + val HA_SERVER_ADDRESSES = Entry("livy.server.ha.server-addresses", "") /** * For filesystem state store, the path of the state store directory. Please don't use a diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 56d7a442b..b43ad404f 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -73,7 +73,8 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, val leaderKey = s"/$haKeyPrefix/leader" val leaderIds = livyConf.configToSeq(LivyConf.HA_SERVER_IDS) - val leaderEndpoints = livyConf.configToSeq(LivyConf.HA_SERVER_ENDPOINTS) + val leaderHostnames = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES) + val leaderAddresses = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES) var leaderLatch = mockLeaderLatch.getOrElse { new LeaderLatch(client, leaderKey, getCurrentId()) @@ -90,17 +91,17 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, } def getCurrentId(): String = { - val currentEndpoint = java.net.InetAddress.getLocalHost().getHostName(); - info(currentEndpoint) - info(leaderEndpoints) - val currentId = leaderIds(leaderEndpoints indexOf currentEndpoint) + val currentHostname = java.net.InetAddress.getLocalHost().getHostName(); + info(currentHostname) + info(leaderHostnames) + val currentId = leaderIds(leaderHostnames indexOf currentHostname) currentId } - def getActiveEndpoint(): String = { + def getActiveAddress(): String = { val activeLeaderId = leaderLatch.getLeader().getId() - val activeEndpoint = leaderEndpoints(leaderIds indexOf activeLeaderId) - activeEndpoint + val activeAddress = leaderAddresses(leaderIds indexOf activeLeaderId) + activeAddress } def start(): Unit = { diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index 82811517b..fb7870908 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -52,7 +52,7 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = { - info("active leader is:" + HAService.getActiveEndpoint()) + info("active leader is:" + HAService.getActiveAddress()) info("current id:" + HAService.getCurrentId()) if (!isLeader()) { val httpRequest = request.asInstanceOf[HttpServletRequest] @@ -60,12 +60,12 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter info(requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) - val redirectURL = builder.host(HAService.getActiveEndpoint()).toUriString() + val redirectURL = builder.host(HAService.getActiveAddress()).toUriString() info(redirectURL) val httpServletResponse = response.asInstanceOf[HttpServletResponse]; val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL - val out = httpServletResponse.getWriter() + val out = httpServletResponse.getWriter()es // scalastyle:off println out.println(redirectMsg) // scalastyle:on println From dd47e370ed3bb60a85a3c22c94b2a810cd4732b8 Mon Sep 17 00:00:00 2001 From: roliu Date: Tue, 17 Mar 2020 14:25:36 -0700 Subject: [PATCH 19/28] fixed typo --- .../scala/org/apache/livy/server/DomainRedirectionFilter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index fb7870908..ef820046c 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -65,7 +65,7 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter val httpServletResponse = response.asInstanceOf[HttpServletResponse]; val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL - val out = httpServletResponse.getWriter()es + val out = httpServletResponse.getWriter() // scalastyle:off println out.println(redirectMsg) // scalastyle:on println From fe79e755b62321dd3863449972208c554b693e9c Mon Sep 17 00:00:00 2001 From: roliu Date: Tue, 17 Mar 2020 16:27:07 -0700 Subject: [PATCH 20/28] changed code to account for zookeeper manager changes --- portServer.sh | 8 ++++++++ .../main/scala/org/apache/livy/server/LivyServer.scala | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 portServer.sh diff --git a/portServer.sh b/portServer.sh new file mode 100644 index 000000000..f12ef3520 --- /dev/null +++ b/portServer.sh @@ -0,0 +1,8 @@ +rm -r ~/livy-server/* + +cp assembly/target/apache-livy-0.8.0-incubating-SNAPSHOT-bin.zip ~/livy-server/ +unzip ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin.zip -d ~/livy-server/ +mkdir ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin/logs + +kubectl cp ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin test/sparkhead-0:/ -c hadoop-livy-sparkhistory +kubectl cp ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin test/sparkhead-1:/ -c hadoop-livy-sparkhistory diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index ae9a51fba..7fd0c5647 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -409,9 +409,11 @@ class LivyServer extends Logging { def stop(): Unit = { if (server != null) { info("Shutting down Livy server.") - zkManager.foreach(_.stop()) server.stop() _thriftServerFactory.foreach(_.stop()) + if (livyConf.get(LivyConf.HA_MODE) != HighAvailabilitySettings.HA_ON) { + zkManager.foreach(_.stop()) + } } } From 8178eefce96a9a6a21250afbdbc8240aec36ab82 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 17 Mar 2020 16:31:37 -0700 Subject: [PATCH 21/28] removed extraneous file --- portServer.sh | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 portServer.sh diff --git a/portServer.sh b/portServer.sh deleted file mode 100644 index f12ef3520..000000000 --- a/portServer.sh +++ /dev/null @@ -1,8 +0,0 @@ -rm -r ~/livy-server/* - -cp assembly/target/apache-livy-0.8.0-incubating-SNAPSHOT-bin.zip ~/livy-server/ -unzip ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin.zip -d ~/livy-server/ -mkdir ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin/logs - -kubectl cp ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin test/sparkhead-0:/ -c hadoop-livy-sparkhistory -kubectl cp ~/livy-server/apache-livy-0.8.0-incubating-SNAPSHOT-bin test/sparkhead-1:/ -c hadoop-livy-sparkhistory From eb44f4636a3896e19dbb64b3744969d6adb125fb Mon Sep 17 00:00:00 2001 From: roliu Date: Wed, 18 Mar 2020 11:46:58 -0700 Subject: [PATCH 22/28] fixed scalastyle error --- server/src/main/scala/org/apache/livy/server/LivyServer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 7fd0c5647..97d468876 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -411,9 +411,9 @@ class LivyServer extends Logging { info("Shutting down Livy server.") server.stop() _thriftServerFactory.foreach(_.stop()) - if (livyConf.get(LivyConf.HA_MODE) != HighAvailabilitySettings.HA_ON) { + if (livyConf.get(LivyConf.HA_MODE) != HighAvailabilitySettings.HA_ON) { zkManager.foreach(_.stop()) - } + } } } From 9b1f21b61ccb6efba7d747729cc7037378641409 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 24 Mar 2020 12:30:59 -0700 Subject: [PATCH 23/28] reduced verbosity of redirect messages --- .../org/apache/livy/server/CuratorElectorService.scala | 4 ++-- .../org/apache/livy/server/DomainRedirectionFilter.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index b43ad404f..21029d176 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -92,8 +92,8 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, def getCurrentId(): String = { val currentHostname = java.net.InetAddress.getLocalHost().getHostName(); - info(currentHostname) - info(leaderHostnames) + debug(currentHostname) + debug(leaderHostnames) val currentId = leaderIds(leaderHostnames indexOf currentHostname) currentId } diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index ef820046c..83aba6403 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -52,16 +52,17 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = { - info("active leader is:" + HAService.getActiveAddress()) - info("current id:" + HAService.getCurrentId()) if (!isLeader()) { + debug("active leader is:" + HAService.getActiveAddress()) + debug("current id:" + HAService.getCurrentId()) + val httpRequest = request.asInstanceOf[HttpServletRequest] val requestURL = httpRequest.getRequestURL().toString() - info(requestURL) + debug(requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) val redirectURL = builder.host(HAService.getActiveAddress()).toUriString() - info(redirectURL) + debug(redirectURL) val httpServletResponse = response.asInstanceOf[HttpServletResponse]; val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL From 6caf0cc1c50cd46046788f4460e3da3f0a1faef5 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 24 Mar 2020 12:38:12 -0700 Subject: [PATCH 24/28] improved descriptiveness of log messages --- .../org/apache/livy/server/CuratorElectorService.scala | 5 +++-- .../org/apache/livy/server/DomainRedirectionFilter.scala | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 21029d176..d8810a3cd 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -92,9 +92,10 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, def getCurrentId(): String = { val currentHostname = java.net.InetAddress.getLocalHost().getHostName(); - debug(currentHostname) - debug(leaderHostnames) + debug("This server's current hostname is: " + currentHostname) + debug("This hostnames for valid leaders are: " + leaderHostnames) val currentId = leaderIds(leaderHostnames indexOf currentHostname) + debug("This server's designated ID is: " + currentId) currentId } diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index 83aba6403..7664edba5 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -53,16 +53,16 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter response: ServletResponse, chain: FilterChain): Unit = { if (!isLeader()) { - debug("active leader is:" + HAService.getActiveAddress()) - debug("current id:" + HAService.getCurrentId()) + debug("active leader's address is:" + haService.getActiveAddress()) + debug("current id:" + haService.getCurrentId()) val httpRequest = request.asInstanceOf[HttpServletRequest] val requestURL = httpRequest.getRequestURL().toString() - debug(requestURL) + debug("requested url: " + requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) val redirectURL = builder.host(HAService.getActiveAddress()).toUriString() - debug(redirectURL) + debug("redirected url:" + redirectURL) val httpServletResponse = response.asInstanceOf[HttpServletResponse]; val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL From 0e2379b5225696ad5ac8afe6d41a800a6d3fce48 Mon Sep 17 00:00:00 2001 From: Roger Liu Date: Tue, 24 Mar 2020 13:49:57 -0700 Subject: [PATCH 25/28] refactored variable name --- .../org/apache/livy/server/DomainRedirectionFilter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index 7664edba5..1cc1207ed 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -35,7 +35,7 @@ import org.springframework.web.util.UriComponentsBuilder import org.apache.livy.{LivyConf, Logging} -class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter +class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter with Logging { @@ -44,7 +44,7 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter val HEADER_NAME = "X-Requested-By" def isLeader(): Boolean = { - HAService.currentState == HAState.Active + haService.currentState == HAState.Active } override def init(filterConfig: FilterConfig): Unit = {} @@ -61,7 +61,7 @@ class DomainRedirectionFilter(HAService: CuratorElectorService) extends Filter debug("requested url: " + requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) - val redirectURL = builder.host(HAService.getActiveAddress()).toUriString() + val redirectURL = builder.host(haService.getActiveAddress()).toUriString() debug("redirected url:" + redirectURL) val httpServletResponse = response.asInstanceOf[HttpServletResponse]; From dd87210f75a2ab8c1c19b4856db29c00742927ba Mon Sep 17 00:00:00 2001 From: James Chen Date: Thu, 23 Jul 2020 02:06:49 -0700 Subject: [PATCH 26/28] A few renamings (Synchronizing with local commit 2ca2ae69) --- server/src/main/scala/org/apache/livy/LivyConf.scala | 2 +- .../org/apache/livy/server/CuratorElectorService.scala | 7 ++++--- .../org/apache/livy/server/DomainRedirectionFilter.scala | 3 +-- .../main/scala/org/apache/livy/server/LivyServer.scala | 7 +++---- .../scala/org/apache/livy/sessions/SessionManager.scala | 3 ++- .../livy/server/recovery/CuratorElectorServiceSpec.scala | 2 +- .../org/apache/livy/sessions/SessionManagerSpec.scala | 8 ++++---- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 2d732e3d3..ebf6cfcae 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -201,7 +201,7 @@ object LivyConf { * High Availability mode of Livy. Possible values: * off: Default. Turn off High Availability. * on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the -* correct state. + * correct state. * Must set livy.server.ha.zookeeper-url to configure HA */ val HA_MODE = Entry("livy.server.ha.mode", "off") diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index d8810a3cd..94be580db 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -82,11 +82,11 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, leaderLatch.addListener(this) var currentState = HAState.Standby - def isLeader() { + override def isLeader() { transitionToActive() } - def notLeader(){ + override def notLeader(){ transitionToStandby() } @@ -106,6 +106,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, } def start(): Unit = { + server.initHa(this) transitionToStandby() server.start() @@ -146,4 +147,4 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, info("Transition complete") } } -} +} \ No newline at end of file diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index 1cc1207ed..cf21b3937 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -55,7 +55,6 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter if (!isLeader()) { debug("active leader's address is:" + haService.getActiveAddress()) debug("current id:" + haService.getCurrentId()) - val httpRequest = request.asInstanceOf[HttpServletRequest] val requestURL = httpRequest.getRequestURL().toString() debug("requested url: " + requestURL) @@ -79,4 +78,4 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter } override def destroy(): Unit = {} -} +} \ No newline at end of file diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 97d468876..4a7648fe6 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -339,8 +339,8 @@ class LivyServer extends Logging { def start(): Unit = { info("Starting HA connection") - interactiveSessionManager.startSessionManager() - batchSessionManager.startSessionManager() + interactiveSessionManager.recoverSessions() + batchSessionManager.recoverSessions() server.start() _thriftServerFactory.foreach { @@ -466,7 +466,6 @@ object LivyServer { if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON) { info("Starting HA connection") val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server) - server.initHa(electorService) electorService.start() } else { @@ -478,4 +477,4 @@ object LivyServer { } } } -} +} \ No newline at end of file diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index f2734fd57..e2245a7a5 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -84,8 +84,9 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME)) new GarbageCollector().start() + recoverSessions() - def startSessionManager(): Unit = { + def recoverSessions(): Unit = { idCounter.set(0) sessions.clear() sessionsByName.clear() diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala index 8c90cb2ac..091761c77 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -87,4 +87,4 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { } } } -} +} \ No newline at end of file diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 2c89cb2de..fde67ae43 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -232,7 +232,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit .thenReturn(validMetadata ++ invalidMetadata) val sm = new BatchSessionManager(conf, sessionStore) - sm.startSessionManager() + sm.recoverSessions() sm.nextId() shouldBe nextId validMetadata.foreach { m => sm.get(m.get.id) shouldBe defined @@ -249,7 +249,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.startSessionManager() + sm.recoverSessions() sm.get(sessionId) shouldBe defined Await.ready(sm.delete(sessionId).get, 30 seconds) @@ -265,7 +265,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.startSessionManager() + sm.recoverSessions() sm.get(sessionId) shouldBe defined sm.shutdown() @@ -281,7 +281,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit val session = mockSession(sessionId) val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.startSessionManager() + sm.recoverSessions() sm.get(sessionId) shouldBe defined sm.shutdown() From 112ab272f6c16d01bb681690dfc51a6ab8dad969 Mon Sep 17 00:00:00 2001 From: James Chen Date: Thu, 23 Jul 2020 02:06:49 -0700 Subject: [PATCH 27/28] Fixing Livy HA by appending the query string to the redirect URL --- server/src/main/scala/org/apache/livy/LivyConf.scala | 1 + .../org/apache/livy/server/CuratorElectorService.scala | 3 ++- .../org/apache/livy/server/DomainRedirectionFilter.scala | 7 +++++-- .../src/main/scala/org/apache/livy/server/LivyServer.scala | 3 ++- .../livy/server/recovery/CuratorElectorServiceSpec.scala | 3 ++- 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index ebf6cfcae..022847958 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -428,3 +428,4 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { } } + diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index 94be580db..d739c59c4 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -147,4 +147,5 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, info("Transition complete") } } -} \ No newline at end of file +} + diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index cf21b3937..e0bfc5e42 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -56,11 +56,13 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter debug("active leader's address is:" + haService.getActiveAddress()) debug("current id:" + haService.getCurrentId()) val httpRequest = request.asInstanceOf[HttpServletRequest] + val queryOpt: Option[String] = Option(httpRequest.getQueryString()) val requestURL = httpRequest.getRequestURL().toString() debug("requested url: " + requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) - val redirectURL = builder.host(haService.getActiveAddress()).toUriString() + val activeURL = builder.host(haService.getActiveAddress()).toUriString() + val redirectURL = if (queryOpt.isEmpty) activeURL else activeURL + "?" + queryOpt.get debug("redirected url:" + redirectURL) val httpServletResponse = response.asInstanceOf[HttpServletResponse]; @@ -78,4 +80,5 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter } override def destroy(): Unit = {} -} \ No newline at end of file +} + diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 4a7648fe6..32e94afdd 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -477,4 +477,5 @@ object LivyServer { } } } -} \ No newline at end of file +} + diff --git a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala index 091761c77..8b037b5fb 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/CuratorElectorServiceSpec.scala @@ -87,4 +87,5 @@ class CuratorElectorServiceSpec extends FunSpec with LivyBaseUnitTestSuite { } } } -} \ No newline at end of file +} + From cbdaaafa12544fa29964eeb15912360914555cf2 Mon Sep 17 00:00:00 2001 From: James Chen Date: Thu, 23 Jul 2020 02:06:50 -0700 Subject: [PATCH 28/28] Fixing Livy leader domain omission issue --- server/src/main/scala/org/apache/livy/LivyConf.scala | 3 --- .../org/apache/livy/server/CuratorElectorService.scala | 7 +++---- .../org/apache/livy/server/DomainRedirectionFilter.scala | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 022847958..398395df6 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -215,9 +215,6 @@ object LivyConf { // The hostnames of all servers used in HA val HA_SERVER_HOSTNAMES = Entry("livy.server.ha.server-hostnames", "") - // The addresses of all servers used in HA - val HA_SERVER_ADDRESSES = Entry("livy.server.ha.server-addresses", "") - /** * For filesystem state store, the path of the state store directory. Please don't use a * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. diff --git a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala index d739c59c4..2c3ef0c48 100644 --- a/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala +++ b/server/src/main/scala/org/apache/livy/server/CuratorElectorService.scala @@ -74,7 +74,6 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, val leaderIds = livyConf.configToSeq(LivyConf.HA_SERVER_IDS) val leaderHostnames = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES) - val leaderAddresses = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES) var leaderLatch = mockLeaderLatch.getOrElse { new LeaderLatch(client, leaderKey, getCurrentId()) @@ -91,7 +90,7 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, } def getCurrentId(): String = { - val currentHostname = java.net.InetAddress.getLocalHost().getHostName(); + val currentHostname = java.net.InetAddress.getLocalHost().getCanonicalHostName(); debug("This server's current hostname is: " + currentHostname) debug("This hostnames for valid leaders are: " + leaderHostnames) val currentId = leaderIds(leaderHostnames indexOf currentHostname) @@ -99,9 +98,9 @@ class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer, currentId } - def getActiveAddress(): String = { + def getActiveHostname(): String = { val activeLeaderId = leaderLatch.getLeader().getId() - val activeAddress = leaderAddresses(leaderIds indexOf activeLeaderId) + val activeAddress = leaderHostnames(leaderIds indexOf activeLeaderId) activeAddress } diff --git a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala index e0bfc5e42..292fcd03c 100644 --- a/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala +++ b/server/src/main/scala/org/apache/livy/server/DomainRedirectionFilter.scala @@ -53,7 +53,7 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter response: ServletResponse, chain: FilterChain): Unit = { if (!isLeader()) { - debug("active leader's address is:" + haService.getActiveAddress()) + debug("active leader's hostnames are:" + haService.getActiveHostname()) debug("current id:" + haService.getCurrentId()) val httpRequest = request.asInstanceOf[HttpServletRequest] val queryOpt: Option[String] = Option(httpRequest.getQueryString()) @@ -61,7 +61,7 @@ class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter debug("requested url: " + requestURL) val builder = UriComponentsBuilder.fromHttpUrl(requestURL) - val activeURL = builder.host(haService.getActiveAddress()).toUriString() + val activeURL = builder.host(haService.getActiveHostname()).toUriString() val redirectURL = if (queryOpt.isEmpty) activeURL else activeURL + "?" + queryOpt.get debug("redirected url:" + redirectURL)