Skip to content

Commit

Permalink
[KYUUBI #3098] Unify the event log code path
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

close #3098 (comment)
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3090 from zhaomin1423/event_spi.

Closes #3098

322e48d [Min] clear unnecessary variable
22b448d [Min] fix review
a452321 [Min] [KYUUBI #3098] Unify the event log code path

Authored-by: Min <zhaomin1423@163.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
zhaomin1423 authored and ulysses-you committed Aug 11, 2022
1 parent 6aa898e commit d086525
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kyuubi.engine.hive

import java.net.InetAddress
import java.security.PrivilegedExceptionAction

import scala.util.control.NonFatal
Expand All @@ -27,15 +26,14 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_EVENT_LOGGERS}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_LOGGERS
import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine
import org.apache.kyuubi.engine.hive.events.HiveEngineEvent
import org.apache.kyuubi.engine.hive.events.handler.HiveJsonLoggingEventHandler
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveEventHandlerRegister}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.util.SignalRegister

class HiveSQLEngine extends Serverable("HiveSQLEngine") {
override val backendService: AbstractBackendService = new HiveBackendService(this)
Expand Down Expand Up @@ -115,20 +113,7 @@ object HiveSQLEngine extends Logging {
}

private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
conf.get(ENGINE_EVENT_LOGGERS).map(EventLoggerType.withName).foreach {
case EventLoggerType.JSON =>
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val handler = HiveJsonLoggingEventHandler(
s"Hive-$hostName",
ENGINE_EVENT_JSON_LOG_PATH,
hadoopConf,
conf)
EventBus.register[KyuubiEvent](handler)
case logger =>
throw new IllegalArgumentException(s"Unrecognized event logger: $logger")

}
HiveEventHandlerRegister.registerEngineEventLoggers(conf)
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.events

import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
import org.apache.kyuubi.engine.hive.events.handler.HiveJsonLoggingEventHandler
import org.apache.kyuubi.events.{EventHandlerRegister, KyuubiEvent}
import org.apache.kyuubi.events.handler.EventHandler
import org.apache.kyuubi.util.KyuubiHadoopUtils

object HiveEventHandlerRegister extends EventHandlerRegister {

override protected def createJsonEventHandler(
kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
val hostName = InetAddress.getLocalHost.getCanonicalHostName
HiveJsonLoggingEventHandler(
s"Hive-$hostName",
ENGINE_EVENT_JSON_LOG_PATH,
hadoopConf,
kyuubiConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
Expand Down Expand Up @@ -235,27 +234,9 @@ object SparkSQLEngine extends Logging {
}

def initLoggerEventHandler(conf: KyuubiConf): Unit = {
conf.get(ENGINE_EVENT_LOGGERS)
.map(EventLoggerType.withName)
.foreach {
case EventLoggerType.SPARK =>
EventBus.register[KyuubiEvent](new SparkHistoryLoggingEventHandler(spark.sparkContext))
case EventLoggerType.JSON =>
val handler = SparkJsonLoggingEventHandler(
spark.sparkContext.applicationAttemptId
.map(id => s"${spark.sparkContext.applicationId}_$id")
.getOrElse(spark.sparkContext.applicationId),
ENGINE_EVENT_JSON_LOG_PATH,
spark.sparkContext.hadoopConfiguration,
conf)

// register JsonLogger as a event handler for default event bus
EventBus.register[KyuubiEvent](handler)
case _ =>
}

val sparkEventRegister = new SparkEventHandlerRegister(spark)
sparkEventRegister.registerEngineEventLoggers(conf)
}

}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.events

import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
import org.apache.kyuubi.events.{EventHandlerRegister, KyuubiEvent}
import org.apache.kyuubi.events.handler.EventHandler

class SparkEventHandlerRegister(spark: SparkSession) extends EventHandlerRegister {

override protected def createSparkEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
new SparkHistoryLoggingEventHandler(spark.sparkContext)
}

override protected def createJsonEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
SparkJsonLoggingEventHandler(
spark.sparkContext.applicationAttemptId
.map(id => s"${spark.sparkContext.applicationId}_$id")
.getOrElse(spark.sparkContext.applicationId),
ENGINE_EVENT_JSON_LOG_PATH,
spark.sparkContext.hadoopConfiguration,
kyuubiConf)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,21 @@

package org.apache.kyuubi.engine.trino

import java.net.InetAddress
import java.util.concurrent.CountDownLatch

import scala.util.control.NonFatal

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.TRINO_ENGINE_SHUTDOWN_PRIORITY
import org.apache.kyuubi.Utils.addShutdownHook
import org.apache.kyuubi.Utils.{addShutdownHook, TRINO_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_EVENT_LOGGERS}
import org.apache.kyuubi.engine.trino.TrinoSqlEngine.countDownLatch
import org.apache.kyuubi.engine.trino.TrinoSqlEngine.currentEngine
import org.apache.kyuubi.engine.trino.event.TrinoEngineEvent
import org.apache.kyuubi.engine.trino.event.handler.TrinoJsonLoggingEventHandler
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_LOGGERS
import org.apache.kyuubi.engine.trino.TrinoSqlEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.trino.event.{TrinoEngineEvent, TrinoEventHandlerRegister}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.util.SignalRegister

case class TrinoSqlEngine()
extends Serverable("TrinoSQLEngine") {
Expand Down Expand Up @@ -92,19 +88,7 @@ object TrinoSqlEngine extends Logging {
}

private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
conf.get(ENGINE_EVENT_LOGGERS).map(EventLoggerType.withName).foreach {
case EventLoggerType.JSON =>
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val handler = TrinoJsonLoggingEventHandler(
s"Trino-$hostName",
ENGINE_EVENT_JSON_LOG_PATH,
hadoopConf,
conf)
EventBus.register[KyuubiEvent](handler)
case logger =>
throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
}
TrinoEventHandlerRegister.registerEngineEventLoggers(conf)
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.trino.event

import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
import org.apache.kyuubi.engine.trino.event.handler.TrinoJsonLoggingEventHandler
import org.apache.kyuubi.events.{EventHandlerRegister, KyuubiEvent}
import org.apache.kyuubi.events.handler.EventHandler
import org.apache.kyuubi.util.KyuubiHadoopUtils

object TrinoEventHandlerRegister extends EventHandlerRegister {

override protected def createJsonEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
val hostName = InetAddress.getLocalHost.getCanonicalHostName
TrinoJsonLoggingEventHandler(
s"Trino-$hostName",
ENGINE_EVENT_JSON_LOG_PATH,
hadoopConf,
kyuubiConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.events

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_LOGGERS, SERVER_EVENT_LOGGERS}
import org.apache.kyuubi.events.EventLoggerType.EventLoggerType
import org.apache.kyuubi.events.handler.EventHandler

trait EventHandlerRegister extends Logging {

def registerEngineEventLoggers(conf: KyuubiConf): Unit = {
val loggers = conf.get(ENGINE_EVENT_LOGGERS)
register(loggers, conf)
}

def registerServerEventLoggers(conf: KyuubiConf): Unit = {
val loggers = conf.get(SERVER_EVENT_LOGGERS)
register(loggers, conf)
}

private def register(loggers: Seq[String], conf: KyuubiConf): Unit = {
loggers
.map(EventLoggerType.withName)
.foreach { logger =>
EventBus.register(loadEventHandler(logger, conf))
}
}

protected def createSparkEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported spark event logger.")
}

protected def createJsonEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported json event logger.")
}

protected def createJdbcEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported jdbc event logger.")
}

private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
eventLoggerType match {
case EventLoggerType.SPARK =>
createSparkEventHandler(kyuubiConf)

case EventLoggerType.JSON =>
createJsonEventHandler(kyuubiConf)

case EventLoggerType.JDBC =>
createJdbcEventHandler(kyuubiConf)

case other =>
throw new KyuubiException(s"Unsupported event logger: ${other.toString}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.events

import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_JSON_LOG_PATH
import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler}
import org.apache.kyuubi.util.KyuubiHadoopUtils

object ServerEventHandlerRegister extends EventHandlerRegister {

override protected def createJsonEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
val hostName = InetAddress.getLocalHost.getCanonicalHostName
ServerJsonLoggingEventHandler(
s"server-$hostName",
SERVER_EVENT_JSON_LOG_PATH,
hadoopConf,
kyuubiConf)
}
}
Loading

0 comments on commit d086525

Please sign in to comment.