Skip to content

Commit

Permalink
[KYUUBI-135][KYUUBI-142]add support for spark 2.4.0 release (#157)
Browse files Browse the repository at this point in the history
* fix #135 spark 2.4.0 support

* fix #142

* import

* add ut

* fix ut

* fix ut

* fix ut

* fix ut
  • Loading branch information
yaooqinn committed Mar 1, 2019
1 parent 185d60a commit 7401ca3
Show file tree
Hide file tree
Showing 21 changed files with 374 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ deploy:

jobs:
include:
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
Expand Down
3 changes: 3 additions & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@
<environmentVariables>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-server-${project.version}.jar</KYUUBI_JAR>
</environmentVariables>
<systemProperties>
<spark.sql.warehouse.dir>${project.build.testOutputDirectory}/spark-warehouse</spark.sql.warehouse.dir>
</systemProperties>
</configuration>
<executions>
<execution>
Expand Down
12 changes: 8 additions & 4 deletions kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@ class SparkEnv (
}

object SparkEnv extends Logging {
import scala.collection.JavaConverters._

info("Loaded Kyuubi Supplied SparkEnv Class...")
private val env = new ConcurrentHashMap[String, SparkEnv]()
@volatile private var env: SparkEnv = _
private val envs = new ConcurrentHashMap[String, SparkEnv]()

private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"
Expand All @@ -150,10 +153,11 @@ object SparkEnv extends Logging {
def set(e: SparkEnv) {
if (e == null) {
debug(s"Kyuubi: Removing SparkEnv for $user")
env.remove(user)
envs.remove(user)
} else {
debug(s"Kyuubi: Registering SparkEnv for $user")
env.put(user, e)
envs.put(user, e)
env = e
}
}

Expand All @@ -162,7 +166,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
env.getOrDefault(user, env.values().iterator().next())
envs.getOrDefault(user, envs.values().asScala.headOption.getOrElse(env))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.net.URL
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.util.MutableURLClassLoader

Expand Down Expand Up @@ -56,6 +57,8 @@ private[hive] class IsolatedClientLoader(
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {

import KyuubiSparkUtil._

// Check to make sure that the root classloader does not know about Hive.
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)

Expand All @@ -75,7 +78,28 @@ private[hive] class IsolatedClientLoader(

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)

val ctor = classOf[HiveClientImpl].getConstructors.head
if (majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) > 3) {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
ctor.newInstance(
version,
warehouseDir,
sparkConf,
hadoopConf,
config,
baseClassLoader,
this).asInstanceOf[HiveClientImpl]
} else {
ctor.newInstance(
version,
sparkConf,
hadoopConf,
config,
baseClassLoader,
this).asInstanceOf[HiveClientImpl]
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import org.apache.spark.ui.UIUtils._

import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState, SessionInfo}

/** Page for Spark Web UI that shows statistics of the kyuubi server */
class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
/**
* Page for Spark Web UI that shows statistics of the kyuubi server
*/
class KyuubiSessionPage(parent: KyuubiSessionTab) extends WebUIPage("") {

private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
Expand All @@ -44,10 +46,10 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
{listener.getOnlineSessionNum} session(s) are online,
running {listener.getTotalRunning} SQL statement(s)
</h4> ++
generateSessionStatsTable() ++
generateSQLStatsTable()
generateSessionStatsTable(request) ++
generateSQLStatsTable(request)
}
UIUtils.headerSparkPage("Kyuubi Server", content, parent, Some(5000))
KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session - Application View", content, parent)
}

/** Generate basic stats of the kyuubi server program */
Expand All @@ -64,7 +66,7 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}

/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(): Seq[Node] = {
private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
Expand All @@ -73,7 +75,8 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {

def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
<a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
<a href={"%s/jobs/job?id=%s".format(
KyuubiUIUtils.prependBaseUri(request, parent.basePath), id)}>
[{id}]
</a>
}
Expand Down Expand Up @@ -135,16 +138,16 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}

/** Generate stats of batch sessions of the kyuubi server program */
private def generateSessionStatsTable(): Seq[Node] = {
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val sessionList = listener.getSessionList
val numBatches = sessionList.size
val table = if (numBatches > 0) {
val dataRows = sessionList
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
def generateDataRow(session: SessionInfo): Seq[Node] = {
val sessionLink = "%s/%s/session?id=%s"
.format(UIUtils.prependBaseUri(parent.basePath), parent.prefix, session.sessionId)
val sessionLink = "%s/%s/session?id=%s".format(
KyuubiUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
<tr>
<td> {session.userName} </td>
<td> {session.ip} </td>
Expand Down Expand Up @@ -178,7 +181,6 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
content
}


/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState}

/** Page for Spark Web UI that shows statistics of jobs running in the thrift server */
class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("session") {
class KyuubiSessionSubPage(parent: KyuubiSessionTab) extends WebUIPage("session") {

private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime
Expand All @@ -52,9 +52,9 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
Session created at {formatDate(sessionStat.startTimestamp)},
Total run {sessionStat.totalExecution} SQL
</h4> ++
generateSQLStatsTable(sessionStat.sessionId)
generateSQLStatsTable(request, sessionStat.sessionId)
}
UIUtils.headerSparkPage("Kyuubi Session", content, parent, Some(5000))
KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session", content, parent)
}

/** Generate basic stats of the kyuubi server program */
Expand All @@ -71,7 +71,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}

/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = {
val executionList = listener.getExecutionList
.filter(_.sessionId == sessionID)
val numStatement = executionList.size
Expand All @@ -82,7 +82,8 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio

def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
<a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
<a href={"%s/jobs/job?id=%s"
.format(KyuubiUIUtils.prependBaseUri(request, parent.basePath), id)}>
[{id}]
</a>
}
Expand Down Expand Up @@ -142,6 +143,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}
<td>{errorSummary}{details}</td>
}

/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,38 @@
package org.apache.spark.ui

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.ui.KyuubiServerTab._
import org.apache.spark.ui.KyuubiSessionTab._

import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}

/**
* Spark Web UI tab that shows statistics of jobs running in the thrift server.
* This assumes the given SparkContext has enabled its SparkUI.
*/
class KyuubiServerTab(userName: String, sparkContext: SparkContext)
class KyuubiSessionTab(userName: String, sparkContext: SparkContext)
extends SparkUITab(getSparkUI(sparkContext), userName) {

override val name = s"Kyuubi Tab 4 $userName"

val parent = getSparkUI(sparkContext)

// KyuubiServerTab renders by different listener's content, identified by user.
// KyuubiSessionTab renders by different listener's content, identified by user.
val listener = KyuubiServerMonitor.getListener(userName).getOrElse {
val lr = new KyuubiServerListener(sparkContext.conf)
KyuubiServerMonitor.setListener(userName, lr)
lr
}

attachPage(new KyuubiServerPage(this))
attachPage(new KyuubiServerSessionPage(this))
attachPage(new KyuubiSessionPage(this))
attachPage(new KyuubiSessionSubPage(this))
parent.attachTab(this)

def detach() {
getSparkUI(sparkContext).detachTab(this)
}
}

object KyuubiServerTab {
object KyuubiSessionTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.KyuubiSparkUtil._

import yaooqinn.kyuubi.utils.ReflectUtils

object KyuubiUIUtils {

private val className = "org.apache.spark.ui.UIUtils"

/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
request: HttpServletRequest,
title: String,
content: => Seq[Node],
activeTab: SparkUITab): Seq[Node] = {
val methodMirror = ReflectUtils.reflectStaticMethodScala(className, "headerSparkPage")
if (equalOrHigherThan("2.4")) {
methodMirror(request, title, content, activeTab, Some(5000), None, false, false)
.asInstanceOf[Seq[Node]]
} else {
methodMirror(title, content, activeTab, Some(5000), None, false, false)
.asInstanceOf[Seq[Node]]
}
}

def prependBaseUri(
request: HttpServletRequest,
basePath: String = "",
resource: String = ""): String = {
val method = ReflectUtils.reflectStaticMethodScala(className, "prependBaseUri")
if (equalOrHigherThan("2.4")) {
method(request, basePath, resource).asInstanceOf[String]
} else {
method(basePath, resource).asInstanceOf[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.sql.SparkSession
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.ui.KyuubiSessionTab

import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.author.AuthzHelper
Expand Down Expand Up @@ -190,7 +190,7 @@ class SparkSessionWithUGI(
KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))
KyuubiServerMonitor.getListener(userName)
.foreach(_sparkSession.sparkContext.addSparkListener)
val uiTab = new KyuubiServerTab(userName, _sparkSession.sparkContext)
val uiTab = new KyuubiSessionTab(userName, _sparkSession.sparkContext)
KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package yaooqinn.kyuubi.ui
import scala.collection.mutable.HashMap

import org.apache.spark.SparkException
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.ui.KyuubiSessionTab


object KyuubiServerMonitor {

private[this] val uiTabs = new HashMap[String, KyuubiServerTab]()
private[this] val uiTabs = new HashMap[String, KyuubiSessionTab]()

private[this] val listeners = new HashMap[String, KyuubiServerListener]()

Expand All @@ -37,7 +37,7 @@ object KyuubiServerMonitor {
listeners.get(user)
}

def addUITab(user: String, ui: KyuubiServerTab): Unit = {
def addUITab(user: String, ui: KyuubiSessionTab): Unit = {
uiTabs.put(user, ui)
}

Expand Down

0 comments on commit 7401ca3

Please sign in to comment.