Skip to content

Commit 9ac5faa

Browse files
zhaomin1423ulysses-you
authored andcommitted
[KYUUBI #2028][FOLLOWUP] add engine stop event and fix the partition of initialized event
### _Why are the changes needed?_ add engine stop event; fix initialized event partition; ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2421 from zhaomin1423/fix_hive_engine. Closes #2028 a967631 [Min Zhao] add engineStarTime e7815bf [Min Zhao] add engineStarTime 953a9d8 [Min Zhao] post stop event in stopServer 0ec6911 [Min Zhao] fix conflict 4bf4159 [Min Zhao] fix conflict Authored-by: Min Zhao <zhaomin1423@163.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 03d4bbe commit 9ac5faa

File tree

5 files changed

+31
-17
lines changed

5 files changed

+31
-17
lines changed

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ import org.apache.hadoop.hive.conf.HiveConf
2626
import org.apache.kyuubi.{Logging, Utils}
2727
import org.apache.kyuubi.config.KyuubiConf
2828
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_EVENT_LOGGERS}
29+
import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine
2930
import org.apache.kyuubi.engine.hive.events.HiveEngineEvent
3031
import org.apache.kyuubi.engine.hive.events.handler.HiveJsonLoggingEventHandler
3132
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
3233
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
3334
import org.apache.kyuubi.ha.client.RetryPolicies
34-
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable}
35+
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
3536
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
3637

3738
class HiveSQLEngine extends Serverable("HiveSQLEngine") {
3839
override val backendService: AbstractBackendService = new HiveBackendService(this)
3940
override val frontendServices: Seq[AbstractFrontendService] =
4041
Seq(new HiveTBinaryFrontendService(this))
42+
private[hive] val engineStartTime = System.currentTimeMillis()
4143

4244
override def start(): Unit = {
4345
super.start()
@@ -47,6 +49,12 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
4749
}
4850

4951
override protected def stopServer(): Unit = {
52+
currentEngine.foreach { engine =>
53+
val event = HiveEngineEvent(engine)
54+
.copy(state = ServiceState.STOPPED, endTime = System.currentTimeMillis())
55+
EventBus.post(event)
56+
}
57+
5058
// #2351
5159
// https://issues.apache.org/jira/browse/HIVE-23164
5260
// Server is not properly terminated because of non-daemon threads
@@ -131,7 +139,8 @@ object HiveSQLEngine extends Logging {
131139
case t: Throwable => currentEngine match {
132140
case Some(engine) =>
133141
engine.stop()
134-
val event = HiveEngineEvent(engine).copy(diagnostic = t.getMessage)
142+
val event = HiveEngineEvent(engine)
143+
.copy(endTime = System.currentTimeMillis(), diagnostic = t.getMessage)
135144
EventBus.post(event)
136145
case _ =>
137146
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/events/HiveEngineEvent.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ case class HiveEngineEvent(
3030
diagnostic: String,
3131
settings: Map[String, String]) extends KyuubiEvent {
3232

33-
override def partitions: Seq[(String, String)] =
33+
override def partitions: Seq[(String, String)] = {
3434
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
35+
}
3536

3637
override def toString: String = {
3738
s"""
@@ -50,7 +51,6 @@ case class HiveEngineEvent(
5051
object HiveEngineEvent {
5152

5253
def apply(engine: HiveSQLEngine): HiveEngineEvent = {
53-
engine.getStartTime
5454
val connectionUrl =
5555
if (engine.getServiceState.equals(ServiceState.LATENT)) {
5656
null
@@ -60,7 +60,7 @@ object HiveEngineEvent {
6060

6161
new HiveEngineEvent(
6262
connectionUrl = connectionUrl,
63-
startTime = engine.getStartTime,
63+
startTime = engine.engineStartTime,
6464
endTime = -1L,
6565
state = engine.getServiceState,
6666
diagnostic = "",

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/events/SessionEvent.scala renamed to externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/events/HiveSessionEvent.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.kyuubi.events.KyuubiEvent
3030
* @param serverIp Kyuubi Server IP address
3131
* @param totalOperations how many queries and meta calls
3232
*/
33-
case class SessionEvent(
33+
case class HiveSessionEvent(
3434
sessionId: String,
3535
engineId: String,
3636
username: String,
@@ -45,10 +45,10 @@ case class SessionEvent(
4545

4646
}
4747

48-
object SessionEvent {
48+
object HiveSessionEvent {
4949

50-
def apply(session: HiveSessionImpl): SessionEvent = {
51-
new SessionEvent(
50+
def apply(session: HiveSessionImpl): HiveSessionEvent = {
51+
new HiveSessionEvent(
5252
session.handle.identifier.toString,
5353
engineId = "",
5454
session.user,

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424
import org.apache.hive.service.cli.session.HiveSession
2525
import org.apache.hive.service.rpc.thrift.TProtocolVersion
2626

27-
import org.apache.kyuubi.engine.hive.events.SessionEvent
27+
import org.apache.kyuubi.engine.hive.events.HiveSessionEvent
2828
import org.apache.kyuubi.events.EventBus
2929
import org.apache.kyuubi.operation.{Operation, OperationHandle}
3030
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
@@ -41,7 +41,7 @@ class HiveSessionImpl(
4141
val hive: HiveSession)
4242
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
4343

44-
private val sessionEvent = SessionEvent(this)
44+
private val sessionEvent = HiveSessionEvent(this)
4545

4646
def serverIpAddress(): String = serverIpAddress
4747

externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/event/HiveEventLoggingServiceSuite.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import org.apache.kyuubi.Utils
2828
import org.apache.kyuubi.config.KyuubiConf
2929
import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
3030
import org.apache.kyuubi.engine.hive.HiveSQLEngine
31-
import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveOperationEvent, SessionEvent}
31+
import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveOperationEvent, HiveSessionEvent}
3232
import org.apache.kyuubi.events.JsonProtocol
3333
import org.apache.kyuubi.operation.HiveJDBCTestHelper
34+
import org.apache.kyuubi.service.ServiceState
3435

3536
class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
3637

@@ -69,9 +70,13 @@ class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
6970
val fileSystem: FileSystem = FileSystem.get(new Configuration())
7071
val fs: FSDataInputStream = fileSystem.open(new Path(engineEventPath.toString))
7172
val engineEventReader = new BufferedReader(new InputStreamReader(fs))
72-
val readEvent =
73+
val initializedEvent =
7374
JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[HiveEngineEvent])
74-
assert(readEvent.isInstanceOf[HiveEngineEvent])
75+
assert(initializedEvent.asInstanceOf[HiveEngineEvent].state.equals(ServiceState.INITIALIZED))
76+
77+
val startedEvent =
78+
JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[HiveEngineEvent])
79+
assert(startedEvent.asInstanceOf[HiveEngineEvent].state.equals(ServiceState.STARTED))
7580
}
7681

7782
test("test session event logging") {
@@ -82,15 +87,15 @@ class HiveEventLoggingServiceSuite extends HiveJDBCTestHelper {
8287
assert(!catalogs.next())
8388
val sessionEventPath = Paths.get(
8489
logRoot,
85-
"session",
90+
"hive_session",
8691
s"day=$currentDate",
8792
s"Hive-$hostName.json")
8893
val fileSystem: FileSystem = FileSystem.get(new Configuration())
8994
val fs: FSDataInputStream = fileSystem.open(new Path(sessionEventPath.toString))
9095
val engineEventReader = new BufferedReader(new InputStreamReader(fs))
9196
val readEvent =
92-
JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[SessionEvent])
93-
assert(readEvent.isInstanceOf[SessionEvent])
97+
JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[HiveSessionEvent])
98+
assert(readEvent.isInstanceOf[HiveSessionEvent])
9499
}
95100
}
96101

0 commit comments

Comments
 (0)