Skip to content

Commit f3dc1fd

Browse files
zhenjiaguoyaooqinn
authored andcommitted
[KYUUBI #1586] Add time metric on each KyuubiBackendService method
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add time metric on each` KyuubiBackendService` method can help us inspect Kyuubi Server running status inside. It can Indirect reflecting our RPC call time when using Kyuubi. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ![Screenshot from 2021-12-19 12-26-39](https://user-images.githubusercontent.com/29809822/146663963-7e483f45-198c-4fd7-b039-211dcf26dde5.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1588 from zhenjiaguo/add_be_method_metric. Closes #1586 a001eb9 [zhenjiaguo] Add time metric on each KyuubiBackendService method Authored-by: zhenjiaguo <zhenjia_guo@163.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 01364e5 commit f3dc1fd

File tree

5 files changed

+267
-1
lines changed

5 files changed

+267
-1
lines changed

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,22 @@ object MetricsConstants {
4040
final val OPERATION_FAIL: String = OPERATION + "failed"
4141
final val OPERATION_TOTAL: String = OPERATION + "total"
4242

43+
final private val BACKEND_SERVICE = KYUUBI + "backend_service."
44+
final val OPEN_SESSION_MS = BACKEND_SERVICE + "open_session_ms"
45+
final val CLOSE_SESSION_MS = BACKEND_SERVICE + "close_session_ms"
46+
final val GET_INFO_MS = BACKEND_SERVICE + "get_info_ms"
47+
final val EXECUTE_STATEMENT_MS = BACKEND_SERVICE + "execute_statement_ms"
48+
final val GET_TYPE_INFO_MS = BACKEND_SERVICE + "get_type_info_ms"
49+
final val GET_CATALOGS_MS = BACKEND_SERVICE + "get_catalogs_ms"
50+
final val GET_SCHEMAS_MS = BACKEND_SERVICE + "get_schemas_ms"
51+
final val GET_TABLES_MS = BACKEND_SERVICE + "get_tables_ms"
52+
final val GET_TABLE_TYPES_MS = BACKEND_SERVICE + "get_table_types_ms"
53+
final val GET_COLUMNS_MS = BACKEND_SERVICE + "get_columns_ms"
54+
final val GET_FUNCTIONS_MS = BACKEND_SERVICE + "get_functions_ms"
55+
final val GET_OPERATION_STATUS_MS = BACKEND_SERVICE + "get_operation_status_ms"
56+
final val CANCEL_OPERATION_MS = BACKEND_SERVICE + "cancel_operation_ms"
57+
final val CLOSE_OPERATION_MS = BACKEND_SERVICE + "close_operation_ms"
58+
final val GET_RESULT_SET_METADATA_MS = BACKEND_SERVICE + "get_result_set_metadata_ms"
59+
final val FETCH_RESULTS_MS = BACKEND_SERVICE + "fetch_results_ms"
60+
4361
}

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
4242
counter.dec(1L)
4343
}
4444

45+
def updateHistogram(key: String, value: Long): Unit = {
46+
val histogram = registry.histogram(key)
47+
histogram.update(value)
48+
}
49+
4550
def registerGauge[T](name: String, value: => T, default: T): Unit = {
4651
registry.register(
4752
MetricRegistry.name(name),
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.server
19+
20+
import org.apache.hive.service.rpc.thrift._
21+
22+
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
23+
import org.apache.kyuubi.operation.{OperationHandle, OperationStatus}
24+
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
25+
import org.apache.kyuubi.service.BackendService
26+
import org.apache.kyuubi.session.SessionHandle
27+
28+
trait BackendServiceTimeMetric extends BackendService {
29+
30+
@throws[Exception]
31+
private def timeMetric[T](name: String)(f: => T): T = {
32+
val startTime = System.currentTimeMillis()
33+
try {
34+
f
35+
} finally {
36+
MetricsSystem.tracing(
37+
_.updateHistogram(name, System.currentTimeMillis() - startTime))
38+
}
39+
}
40+
41+
abstract override def openSession(
42+
protocol: TProtocolVersion,
43+
user: String,
44+
password: String,
45+
ipAddr: String,
46+
configs: Map[String, String]): SessionHandle = {
47+
timeMetric(MetricsConstants.OPEN_SESSION_MS) {
48+
super.openSession(protocol, user, password, ipAddr, configs)
49+
}
50+
}
51+
52+
abstract override def closeSession(sessionHandle: SessionHandle): Unit = {
53+
timeMetric(MetricsConstants.CLOSE_SESSION_MS) {
54+
super.closeSession(sessionHandle)
55+
}
56+
}
57+
58+
abstract override def getInfo(
59+
sessionHandle: SessionHandle,
60+
infoType: TGetInfoType): TGetInfoValue = {
61+
timeMetric(MetricsConstants.GET_INFO_MS) {
62+
super.getInfo(sessionHandle, infoType)
63+
}
64+
}
65+
66+
abstract override def executeStatement(
67+
sessionHandle: SessionHandle,
68+
statement: String,
69+
runAsync: Boolean,
70+
queryTimeout: Long): OperationHandle = {
71+
timeMetric(MetricsConstants.EXECUTE_STATEMENT_MS) {
72+
super.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
73+
}
74+
}
75+
76+
abstract override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
77+
timeMetric(MetricsConstants.GET_TYPE_INFO_MS) {
78+
super.getTypeInfo(sessionHandle)
79+
}
80+
}
81+
82+
abstract override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
83+
timeMetric(MetricsConstants.GET_CATALOGS_MS) {
84+
super.getCatalogs(sessionHandle)
85+
}
86+
}
87+
88+
abstract override def getSchemas(
89+
sessionHandle: SessionHandle,
90+
catalogName: String,
91+
schemaName: String): OperationHandle = {
92+
timeMetric(MetricsConstants.GET_SCHEMAS_MS) {
93+
super.getSchemas(sessionHandle, catalogName, schemaName)
94+
}
95+
}
96+
97+
abstract override def getTables(
98+
sessionHandle: SessionHandle,
99+
catalogName: String,
100+
schemaName: String,
101+
tableName: String,
102+
tableTypes: java.util.List[String]): OperationHandle = {
103+
timeMetric(MetricsConstants.GET_TABLES_MS) {
104+
super.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes)
105+
}
106+
}
107+
108+
abstract override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
109+
timeMetric(MetricsConstants.GET_TABLE_TYPES_MS) {
110+
super.getTableTypes(sessionHandle)
111+
}
112+
}
113+
114+
abstract override def getColumns(
115+
sessionHandle: SessionHandle,
116+
catalogName: String,
117+
schemaName: String,
118+
tableName: String,
119+
columnName: String): OperationHandle = {
120+
timeMetric(MetricsConstants.GET_COLUMNS_MS) {
121+
super.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName)
122+
}
123+
}
124+
125+
abstract override def getFunctions(
126+
sessionHandle: SessionHandle,
127+
catalogName: String,
128+
schemaName: String,
129+
functionName: String): OperationHandle = {
130+
timeMetric(MetricsConstants.GET_FUNCTIONS_MS) {
131+
super.getFunctions(sessionHandle, catalogName, schemaName, functionName)
132+
}
133+
}
134+
135+
abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
136+
timeMetric(MetricsConstants.GET_OPERATION_STATUS_MS) {
137+
super.getOperationStatus(operationHandle)
138+
}
139+
}
140+
141+
abstract override def cancelOperation(operationHandle: OperationHandle): Unit = {
142+
timeMetric(MetricsConstants.CANCEL_OPERATION_MS) {
143+
super.cancelOperation(operationHandle)
144+
}
145+
}
146+
147+
abstract override def closeOperation(operationHandle: OperationHandle): Unit = {
148+
timeMetric(MetricsConstants.CLOSE_OPERATION_MS) {
149+
super.closeOperation(operationHandle)
150+
}
151+
}
152+
153+
abstract override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
154+
timeMetric(MetricsConstants.GET_RESULT_SET_METADATA_MS) {
155+
super.getResultSetMetadata(operationHandle)
156+
}
157+
}
158+
159+
abstract override def fetchResults(
160+
operationHandle: OperationHandle,
161+
orientation: FetchOrientation,
162+
maxRows: Int,
163+
fetchLog: Boolean): TRowSet = {
164+
timeMetric(MetricsConstants.FETCH_RESULTS_MS) {
165+
super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
166+
}
167+
}
168+
169+
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
135135

136136
def this() = this(classOf[KyuubiServer].getSimpleName)
137137

138-
override val backendService: AbstractBackendService = new KyuubiBackendService()
138+
override val backendService: AbstractBackendService =
139+
new KyuubiBackendService() with BackendServiceTimeMetric
139140

140141
override lazy val frontendServices: Seq[AbstractFrontendService] =
141142
conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.server
19+
20+
import java.nio.file.{Path, Paths}
21+
import java.time.Duration
22+
23+
import com.fasterxml.jackson.databind.ObjectMapper
24+
25+
import org.apache.kyuubi.{Utils, WithKyuubiServer}
26+
import org.apache.kyuubi.config.KyuubiConf
27+
import org.apache.kyuubi.metrics.MetricsConf
28+
import org.apache.kyuubi.operation.HiveJDBCTestHelper
29+
30+
class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
31+
32+
override protected def jdbcUrl: String = getJdbcUrl
33+
34+
val reportPath: Path = Utils.createTempDir()
35+
override protected val conf: KyuubiConf = {
36+
KyuubiConf()
37+
.set(MetricsConf.METRICS_REPORTERS, Seq("JSON"))
38+
.set(MetricsConf.METRICS_JSON_LOCATION, reportPath.toString)
39+
.set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis)
40+
}
41+
42+
test("backend service method time metric test") {
43+
val objMapper = new ObjectMapper()
44+
45+
withJdbcStatement() { statement =>
46+
statement.execute("show databases")
47+
Thread.sleep(Duration.ofMillis(111).toMillis)
48+
49+
val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
50+
assert(res1.has("histograms"))
51+
val histograms1 = res1.get("histograms")
52+
assert(
53+
histograms1.get("kyuubi.backend_service.execute_statement_ms").get("count").asInt() == 1)
54+
assert(
55+
histograms1.get("kyuubi.backend_service.execute_statement_ms").get("mean").asDouble() > 0)
56+
57+
statement.execute("show tables")
58+
Thread.sleep(Duration.ofMillis(111).toMillis)
59+
60+
val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
61+
val histograms2 = res2.get("histograms")
62+
assert(
63+
histograms2.get("kyuubi.backend_service.open_session_ms").get("count").asInt() == 1)
64+
assert(
65+
histograms2.get("kyuubi.backend_service.open_session_ms").get("min").asInt() > 0)
66+
val execStatementNode2 = histograms2.get("kyuubi.backend_service.execute_statement_ms")
67+
assert(execStatementNode2.get("count").asInt() == 2)
68+
assert(
69+
execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() &&
70+
execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble())
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)