Skip to content

Commit 6fe6975

Browse files
iodoneulysses-you
authored andcommitted
[KYUUBI #1798] Add EventBus module to unify the distribution and subscription of Kyuubi's events
### _Why are the changes needed?_ Use case is simple: ``` // scribe sync EventBus.register[Test0KyuubiEvent](new Test0Handler with EventHandler) // scribe async EventBus.registerAsync[Test0KyuubiEvent](new Test0Handler with EventHandler) // send event EventBus.post(KyuubiEvent("test0")) EventBus.post(KyuubiEvent("test1")) ``` ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1853 from iodone/dev. Closes #1798 7c01700 [odone] Updated d2ecb42 [odone] [Fixed] address some reviews 09a467f [odone] [Removed] Remove some mistakenly changed content 5136fe4 [odone] [Update] JsonLoggerEventHandler instead of LoggingService 29b4f8a [odone] [Update] JsonLoggerEventHandler instead of LoggingService a2f90bb [odone] [KYUUBI #1798] Added: Add async event handler execution 21be135 [odone] Added: more test case 10906cf [odone] Added: eventHandler can scribe subclass event 2c09b75 [odone] Added: Event Bus bb05f77 [odone] Added: Event Bus Authored-by: odone <odone.zhang@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent dcc71b3 commit 6fe6975

File tree

19 files changed

+458
-261
lines changed

19 files changed

+458
-261
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import org.apache.kyuubi.config.KyuubiConf
3434
import org.apache.kyuubi.config.KyuubiConf._
3535
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
3636
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
37-
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventLoggingService}
38-
import org.apache.kyuubi.events.EventLogging
37+
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
38+
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
39+
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
3940
import org.apache.kyuubi.ha.HighAvailabilityConf._
4041
import org.apache.kyuubi.ha.client.RetryPolicies
4142
import org.apache.kyuubi.service.Serverable
@@ -144,20 +145,18 @@ object SparkSQLEngine extends Logging {
144145
def startEngine(spark: SparkSession): Unit = {
145146
currentEngine = Some(new SparkSQLEngine(spark))
146147
currentEngine.foreach { engine =>
147-
// start event logging ahead so that we can capture all statuses
148-
val eventLogging = new SparkEventLoggingService(spark.sparkContext)
149148
try {
150-
eventLogging.initialize(kyuubiConf)
151-
eventLogging.start()
149+
// start event logging ahead so that we can capture all statuses
150+
initLoggerEventHandler(kyuubiConf)
152151
} catch {
153152
case NonFatal(e) =>
154-
// Don't block the main process if the `EventLoggingService` failed to start
155-
warn(s"Failed to initialize EventLoggingService: ${e.getMessage}", e)
153+
// Don't block the main process if the `LoggerEventHandler` failed to start
154+
warn(s"Failed to initialize LoggerEventHandler: ${e.getMessage}", e)
156155
}
157156

158157
try {
159158
engine.initialize(kyuubiConf)
160-
EventLogging.onEvent(EngineEvent(engine))
159+
EventBus.post(EngineEvent(engine))
161160
} catch {
162161
case t: Throwable =>
163162
throw new KyuubiException(s"Failed to initialize SparkSQLEngine: ${t.getMessage}", t)
@@ -173,15 +172,37 @@ object SparkSQLEngine extends Logging {
173172
kyuubiConf)
174173
val event = EngineEvent(engine)
175174
info(event)
176-
EventLogging.onEvent(event)
175+
EventBus.post(event)
177176
} catch {
178177
case t: Throwable =>
179178
throw new KyuubiException(s"Failed to start SparkSQLEngine: ${t.getMessage}", t)
180179
}
181180
// Stop engine before SparkContext stopped to avoid calling a stopped SparkContext
182181
addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 2)
183-
addShutdownHook(() => eventLogging.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
184182
}
183+
184+
def initLoggerEventHandler(conf: KyuubiConf): Unit = {
185+
conf.get(ENGINE_EVENT_LOGGERS)
186+
.map(EventLoggerType.withName)
187+
.foreach {
188+
case EventLoggerType.SPARK =>
189+
EventBus.register[KyuubiEvent](new SparkHistoryLoggingEventHandler(spark.sparkContext))
190+
case EventLoggerType.JSON =>
191+
val handler = SparkJsonLoggingEventHandler(
192+
spark.sparkContext.applicationAttemptId
193+
.map(id => s"${spark.sparkContext.applicationId}_$id")
194+
.getOrElse(spark.sparkContext.applicationId),
195+
ENGINE_EVENT_JSON_LOG_PATH,
196+
spark.sparkContext.hadoopConfiguration,
197+
conf)
198+
199+
// register JsonLogger as a event handler for default event bus
200+
EventBus.register[KyuubiEvent](handler)
201+
case _ =>
202+
}
203+
204+
}
205+
185206
}
186207

187208
def main(args: Array[String]): Unit = {
@@ -211,7 +232,7 @@ object SparkSQLEngine extends Logging {
211232
case Some(engine) =>
212233
engine.stop()
213234
val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
214-
EventLogging.onEvent(event)
235+
EventBus.post(event)
215236
error(event, e)
216237
case _ => error("Current SparkSQLEngine is not created.")
217238
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkEventLoggingService.scala

Lines changed: 0 additions & 50 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.spark.events.handler
19+
20+
import org.apache.spark.SparkContext
21+
import org.apache.spark.kyuubi.SparkContextHelper
22+
23+
import org.apache.kyuubi.events.KyuubiEvent
24+
import org.apache.kyuubi.events.handler.EventHandler
25+
26+
class SparkHistoryLoggingEventHandler(sc: SparkContext) extends EventHandler[KyuubiEvent] {
27+
override def apply(event: KyuubiEvent): Unit = {
28+
SparkContextHelper.postEventToSparkListenerBus(event, sc)
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.spark.events.handler
19+
20+
import org.apache.hadoop.conf.Configuration
21+
22+
import org.apache.kyuubi.config.{ConfigEntry, KyuubiConf}
23+
import org.apache.kyuubi.events.handler.JsonLoggingEventHandler
24+
25+
case class SparkJsonLoggingEventHandler(
26+
logName: String,
27+
logPath: ConfigEntry[String],
28+
hadoopConf: Configuration,
29+
kyuubiConf: KyuubiConf)
30+
extends JsonLoggingEventHandler(logName, logPath, hadoopConf, kyuubiConf)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Logging}
2929
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
3030
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
3131
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
32-
import org.apache.kyuubi.events.EventLogging
32+
import org.apache.kyuubi.events.EventBus
3333
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType}
3434
import org.apache.kyuubi.operation.OperationState.OperationState
3535
import org.apache.kyuubi.operation.log.OperationLog
@@ -51,7 +51,7 @@ class ExecuteStatement(
5151

5252
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
5353

54-
EventLogging.onEvent(SparkOperationEvent(this))
54+
EventBus.post(SparkOperationEvent(this))
5555

5656
override protected def resultSchema: StructType = {
5757
if (result == null || result.schema.isEmpty) {
@@ -155,7 +155,7 @@ class ExecuteStatement(
155155

156156
override def setState(newState: OperationState): Unit = {
157157
super.setState(newState)
158-
EventLogging.onEvent(
158+
EventBus.post(
159159
SparkOperationEvent(this, operationListener.getExecutionId))
160160
}
161161
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
2323
import org.apache.kyuubi.engine.spark.events.SessionEvent
2424
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
2525
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
26-
import org.apache.kyuubi.events.EventLogging
26+
import org.apache.kyuubi.events.EventBus
2727
import org.apache.kyuubi.operation.{Operation, OperationHandle}
2828
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
2929

@@ -56,7 +56,7 @@ class SparkSessionImpl(
5656
case (key, value) => setModifiableConfig(key, value)
5757
}
5858
KDFRegistry.registerAll(spark)
59-
EventLogging.onEvent(sessionEvent)
59+
EventBus.post(sessionEvent)
6060
super.open()
6161
}
6262

@@ -67,7 +67,7 @@ class SparkSessionImpl(
6767

6868
override def close(): Unit = {
6969
sessionEvent.endTime = System.currentTimeMillis()
70-
EventLogging.onEvent(sessionEvent)
70+
EventBus.post(sessionEvent)
7171
super.close()
7272
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
7373
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,19 @@ import org.apache.spark.ui.SparkUI
2929

3030
import org.apache.kyuubi.Logging
3131
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
32-
import org.apache.kyuubi.events.{EventLogger, KyuubiEvent}
32+
import org.apache.kyuubi.events.KyuubiEvent
3333

3434
/**
3535
* A place to invoke non-public APIs of [[SparkContext]], anything to be added here need to
3636
* think twice
3737
*/
3838
object SparkContextHelper extends Logging {
3939

40-
def createSparkHistoryLogger(sc: SparkContext): EventLogger = {
41-
new SparkHistoryEventLogger(sc)
40+
def postEventToSparkListenerBus(event: KyuubiEvent, sc: SparkContext) {
41+
event match {
42+
case s: SparkListenerEvent => sc.listenerBus.post(s)
43+
case _ => // ignore
44+
}
4245
}
4346

4447
def getKvStore(sc: SparkContext): ElementTrackingStore = {
@@ -81,16 +84,3 @@ object SparkContextHelper extends Logging {
8184
}
8285

8386
}
84-
85-
/**
86-
* A [[EventLogger]] that logs everything to SparkHistory
87-
* @param sc SparkContext
88-
*/
89-
private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger {
90-
override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
91-
kyuubiEvent match {
92-
case s: SparkListenerEvent => sc.listenerBus.post(s)
93-
case _ => // ignore
94-
}
95-
}
96-
}
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.spark.events
18+
package org.apache.kyuubi.engine.spark.events.handler
1919

2020
import java.io.{BufferedReader, InputStreamReader}
2121
import java.nio.file.Paths
@@ -28,17 +28,18 @@ import org.scalatest.time.SpanSugar._
2828
import org.apache.kyuubi.Utils
2929
import org.apache.kyuubi.config.KyuubiConf
3030
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, WithSparkSQLEngine}
31+
import org.apache.kyuubi.engine.spark.events.{EngineEvent, SessionEvent}
3132
import org.apache.kyuubi.events.EventLoggerType._
3233
import org.apache.kyuubi.events.JsonProtocol
3334
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, OperationHandle}
3435

35-
class SparkEventLoggingServiceSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
36+
class SparkJsonLoggingEventHandlerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
3637

3738
private val logRoot = "file://" + Utils.createTempDir().toString
3839
private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())
3940

4041
override def withKyuubiConf: Map[String, String] = Map(
41-
KyuubiConf.ENGINE_EVENT_LOGGERS.key -> s"$JSON,$SPARK",
42+
KyuubiConf.ENGINE_EVENT_LOGGERS.key -> s"$JSON",
4243
KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH.key -> logRoot,
4344
"spark.eventLog.enabled" -> "true",
4445
"spark.eventLog.dir" -> logRoot)
@@ -56,15 +57,13 @@ class SparkEventLoggingServiceSuite extends WithSparkSQLEngine with HiveJDBCTest
5657
"session",
5758
s"day=$currentDate",
5859
KyuubiSparkUtil.engineId + ".json")
59-
6060
val fileSystem: FileSystem = FileSystem.get(new Configuration())
6161
val fs: FSDataInputStream = fileSystem.open(new Path(engineEventPath.toString))
6262
val engineEventReader = new BufferedReader(new InputStreamReader(fs))
6363

6464
val readEvent =
6565
JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[EngineEvent])
6666
assert(readEvent.isInstanceOf[EngineEvent])
67-
6867
withJdbcStatement() { statement =>
6968
val table = engineEventPath.getParent
7069
val resultSet = statement.executeQuery(s"SELECT * FROM `json`.`$table`")

kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala

Lines changed: 0 additions & 59 deletions
This file was deleted.

0 commit comments

Comments
 (0)