Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
477b438
Added support for Active/Passive LivyHA
RogPodge Aug 24, 2019
1a2bd91
Fixed error with the conf
RogPodge Aug 26, 2019
61e06f2
fixed error with livy conf
RogPodge Aug 26, 2019
945f5ce
formatting fixes
RogPodge Aug 26, 2019
de6f885
added back another missing configuration
RogPodge Aug 26, 2019
048c1a2
fixed style errors
RogPodge Aug 27, 2019
31df79a
spelling comment resolution
RogPodge Aug 27, 2019
dd5da8b
fixed string format error
RogPodge Aug 28, 2019
c73bdea
reverted spelling change and addressed threading code comment
RogPodge Sep 5, 2019
b8a4d17
style fix
RogPodge Sep 5, 2019
af0cf82
style fixes
RogPodge Sep 19, 2019
46276f2
Merge remote-tracking branch 'upstream/master'
RogPodge Feb 4, 2020
5c806fc
merged with master
RogPodge Mar 9, 2020
19e266b
added a unit test spec suite for the curator elector service
RogPodge Mar 10, 2020
972ca3d
retooled livy HA to also do domain redirection
RogPodge Mar 11, 2020
fdf449d
updated specs to account for the start()/init() split
RogPodge Mar 11, 2020
636ac05
scalastyle fixes
RogPodge Mar 12, 2020
2423e4d
fixed typo
RogPodge Mar 12, 2020
14741eb
fixed remaining scalastyle error
RogPodge Mar 12, 2020
de250f5
updated minicluster.scala to reflect the init() and start() split
RogPodge Mar 13, 2020
988c219
refactored code to idenitfy the current server based on hostname and …
RogPodge Mar 17, 2020
ff3fd42
Merge branch 'LivyHA' of https://github.com/RogPodge/incubator-livy i…
RogPodge Mar 17, 2020
dd47e37
fixed typo
RogPodge Mar 17, 2020
fe79e75
changed code to account for zookeeper manager changes
RogPodge Mar 17, 2020
8178eef
removed extraneous file
RogPodge Mar 17, 2020
eb44f46
fixed scalastyle error
RogPodge Mar 18, 2020
9b1f21b
reduced verbosity of redirect messages
RogPodge Mar 24, 2020
1643f05
Merge branch 'LivyHA' of https://github.com/RogPodge/incubator-livy i…
RogPodge Mar 24, 2020
6caf0cc
improved descriptiveness of log messages
RogPodge Mar 24, 2020
0e2379b
refactored variable name
RogPodge Mar 24, 2020
dd87210
A few renamings (Synchronizing with local commit 2ca2ae69)
Jul 23, 2020
112ab27
Fixing Livy HA by appending the query string to the redirect URL
Jul 23, 2020
cbdaaaf
Fixing Livy leader domain omission issue
Jul 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@
# on user request and then livy server classpath automatically.
# livy.repl.enable-hive-context =

# High Availability mode of Livy. Possible values:
# off: Default. Turn off High Availability.
# on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the
# correct state.
# Must set livy.server.ha.zookeeper-url to configure HA
# livy.server.ha.mode = off
Comment thread
RogPodge marked this conversation as resolved.

# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
# livy.server.ha.zookeeper-url =

# The list of ids for all livy servers used for HA
# livy.server.ha.server-ids =

# The list of hostnames for all livy servers used for HA
# livy.server.ha.server-hostnames =

# The list of server addresses for all livy servers used for HA
# livy.server.ha.server-addresses =

# Recovery mode of Livy. Possible values:
# off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
# recovery: Livy persists session info to the state store. When Livy restarts, it recovers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ object MiniLivyMain extends MiniClusterBase {
saveProperties(livyConf, new File(configPath + "/livy.conf"))

val server = new LivyServer()
server.init()
server.start()
server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, false)
// Write a serverUrl.conf file to the conf directory with the location of the Livy
Expand Down
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.scalatra</groupId>
<artifactId>scalatra-test_${scala.binary.version}</artifactId>
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ object LivyConf {
* zookeeper: Store state in a Zookeeper instance.
*/
val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null)

/**
* High Availability mode of Livy. Possible values:
* off: Default. Turn off High Availability.
* on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the
* correct state.
* Must set livy.server.ha.zookeeper-url to configure HA
*/
val HA_MODE = Entry("livy.server.ha.mode", "off")

// For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "")

// The ids of all servers used in HA
val HA_SERVER_IDS = Entry("livy.server.ha.server-ids", "")

// The hostnames of all servers used in HA
val HA_SERVER_HOSTNAMES = Entry("livy.server.ha.server-hostnames", "")

/**
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
Expand Down Expand Up @@ -406,3 +425,4 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server

import java.io.Closeable
import java.io.IOException
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.retry.RetryNTimes

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.LivyConf.Entry

object CuratorElectorService {
val HA_KEY_PREFIX_CONF = Entry("livy.server.ha.key-prefix", "livy_ha")
val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100")
}

object HAState extends Enumeration{
type HAState = Value
val Active, Standby = Value
}


class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer,
mockCuratorClient: Option[CuratorFramework] = None,
mockLeaderLatch: Option[LeaderLatch] = None)
extends LeaderLatchListener
with Logging
{
import CuratorElectorService._
import HAState._

val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL)
require(!haAddress.isEmpty, s"Please configure ${LivyConf.HA_ZOOKEEPER_URL.key}.")
val haKeyPrefix = livyConf.get(HA_KEY_PREFIX_CONF)
val retryValue = livyConf.get(HA_RETRY_CONF)
// a regex to match patterns like "m, n" where m and n both are integer values
val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be easier to have two config values?
retry_count and sleep_between_retries_ms

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 configs should be paired together in the config for clarity I believe. Also we're following the example ins the ZooKeeperStateStore.scala

@o-shevchenko o-shevchenko Aug 28, 2019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
just FYI
I have moved this configuration to ZooKeeperManager in #189. Replaced with two separated configs.
Perhaps, we need to merge this PR after #189 or I need to refactor my code to use your approach if this PR will be merged first (but I think that two separated properties more clear then parsing one property) to don't do the same work (like configuration refactoring).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can adjust based on whichever PR is merged first.

val retryPolicy = retryValue match {
case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
case _ => throw new IllegalArgumentException(
s"$HA_KEY_PREFIX_CONF contains bad value: $retryValue. " +
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
}

var server : LivyServer = livyServer

val client: CuratorFramework = mockCuratorClient.getOrElse {
CuratorFrameworkFactory.newClient(haAddress, retryPolicy)
}
val leaderKey = s"/$haKeyPrefix/leader"

val leaderIds = livyConf.configToSeq(LivyConf.HA_SERVER_IDS)
val leaderHostnames = livyConf.configToSeq(LivyConf.HA_SERVER_HOSTNAMES)

var leaderLatch = mockLeaderLatch.getOrElse {
new LeaderLatch(client, leaderKey, getCurrentId())
}
leaderLatch.addListener(this)

var currentState = HAState.Standby
override def isLeader() {
transitionToActive()
}

override def notLeader(){
transitionToStandby()
}

def getCurrentId(): String = {
val currentHostname = java.net.InetAddress.getLocalHost().getCanonicalHostName();
debug("This server's current hostname is: " + currentHostname)
debug("This hostnames for valid leaders are: " + leaderHostnames)
val currentId = leaderIds(leaderHostnames indexOf currentHostname)
debug("This server's designated ID is: " + currentId)
currentId
}

def getActiveHostname(): String = {
val activeLeaderId = leaderLatch.getLeader().getId()
val activeAddress = leaderHostnames(leaderIds indexOf activeLeaderId)
activeAddress
}

def start(): Unit = {
server.initHa(this)
transitionToStandby()

server.start()
client.start()
leaderLatch.start()

try {
Thread.currentThread.join()
} finally {
transitionToStandby()
}
}

def close(): Unit = {
transitionToStandby()
leaderLatch.close()
}

def transitionToActive(): Unit = {
info("Transitioning to Active state")
if(currentState == HAState.Active) {
info("Already in Active State")
}
else {
server.restart()
currentState = HAState.Active
info("Transition complete")
}
}

def transitionToStandby(): Unit = {
info("Transitioning to Standby state")
if(currentState == HAState.Standby) {
info("Already in Standby State")
}
else {
currentState = HAState.Standby
info("Transition complete")
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server

import java.io.IOException
import java.net.URL
import javax.servlet.Filter
import javax.servlet.FilterChain
import javax.servlet.FilterConfig
import javax.servlet.ServletContext
import javax.servlet.ServletException
import javax.servlet.ServletRequest
import javax.servlet.ServletRequestWrapper
import javax.servlet.ServletResponse
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletRequestWrapper
import javax.servlet.http.HttpServletResponse

import org.springframework.web.util.UriComponentsBuilder

import org.apache.livy.{LivyConf, Logging}

class DomainRedirectionFilter(haService: CuratorElectorService) extends Filter
with Logging
{

val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD")

val HEADER_NAME = "X-Requested-By"

def isLeader(): Boolean = {
haService.currentState == HAState.Active
}

override def init(filterConfig: FilterConfig): Unit = {}

override def doFilter(request: ServletRequest,
response: ServletResponse,
chain: FilterChain): Unit = {
if (!isLeader()) {
debug("active leader's hostnames are:" + haService.getActiveHostname())
debug("current id:" + haService.getCurrentId())
val httpRequest = request.asInstanceOf[HttpServletRequest]
val queryOpt: Option[String] = Option(httpRequest.getQueryString())
val requestURL = httpRequest.getRequestURL().toString()
debug("requested url: " + requestURL)

val builder = UriComponentsBuilder.fromHttpUrl(requestURL)
val activeURL = builder.host(haService.getActiveHostname()).toUriString()
val redirectURL = if (queryOpt.isEmpty) activeURL else activeURL + "?" + queryOpt.get
debug("redirected url:" + redirectURL)

val httpServletResponse = response.asInstanceOf[HttpServletResponse];
val redirectMsg = "This is a standby Livy Instance. The redirect url is: " + redirectURL
val out = httpServletResponse.getWriter()
// scalastyle:off println
out.println(redirectMsg)
// scalastyle:on println

httpServletResponse.setHeader("Location", redirectURL)
httpServletResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT)
} else {
chain.doFilter(request, response);
}
}

override def destroy(): Unit = {}
}

Loading