Skip to content

Commit fb0c3a9

Browse files
zhenjiaguoyaooqinn
authored andcommitted
[KYUUBI #1696] Add fetch logs and results rows rate
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add fetch results and logs rows rate metric. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1707 from zhenjiaguo/add_fetch_logs_and_result. Closes #1696 3b7d1fc [zhenjiaguo] Merge branch 'add_fetch_logs_and_result' of github.com:zhenjiaguo/kyuubi into add_fetch_logs_and_result 653e3b0 [zhenjiaguo] comment e93d595 [zhenjiaguo] rerun action 6db7a76 [zhenjiaguo] add rowset not null judgment eff57de [zhenjiaguo] change fetch name 5543b92 [zhenjiaguo] add fetch logs and results rate b99f5a6 [zhenjiaguo] comment 3703e35 [zhenjiaguo] rerun action 082f41a [zhenjiaguo] add rowset not null judgment 1ca562b [zhenjiaguo] change fetch name c6fef42 [zhenjiaguo] add fetch logs and results rate Authored-by: zhenjiaguo <zhenjiaguo@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 4f2696d commit fb0c3a9

File tree

5 files changed

+57
-16
lines changed

5 files changed

+57
-16
lines changed

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ object MetricsConstants {
4141
final val OPERATION_TOTAL: String = OPERATION + "total"
4242

4343
final private val BACKEND_SERVICE = KYUUBI + "backend_service."
44+
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
45+
final val BS_FETCH_RESULT_ROWS_RATE = BACKEND_SERVICE + "fetch_result_rows_rate"
4446
final val BS_OPEN_SESSION = BACKEND_SERVICE + "open_session"
4547
final val BS_CLOSE_SESSION = BACKEND_SERVICE + "close_session"
4648
final val BS_GET_INFO = BACKEND_SERVICE + "get_info"

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
5353
timer.update(duration, unit)
5454
}
5555

56+
def markMeter(key: String, value: Long = 1): Unit = {
57+
val meter = registry.meter(key)
58+
meter.mark(value)
59+
}
60+
5661
def registerGauge[T](name: String, value: => T, default: T): Unit = {
5762
registry.register(
5863
MetricRegistry.name(name),
@@ -105,5 +110,4 @@ object MetricsSystem {
105110
tracing(_.updateTimer(name, System.nanoTime() - startTime, TimeUnit.NANOSECONDS))
106111
}
107112
}
108-
109113
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala renamed to kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
2525
import org.apache.kyuubi.service.BackendService
2626
import org.apache.kyuubi.session.SessionHandle
2727

28-
trait BackendServiceTimeMetric extends BackendService {
28+
trait BackendServiceMetric extends BackendService {
2929

3030
abstract override def openSession(
3131
protocol: TProtocolVersion,
@@ -152,7 +152,28 @@ trait BackendServiceTimeMetric extends BackendService {
152152
maxRows: Int,
153153
fetchLog: Boolean): TRowSet = {
154154
MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) {
155-
super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
155+
val rowSet = super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
156+
val rowsSize =
157+
if (rowSet.getColumnsSize > 0) {
158+
rowSet.getColumns.get(0).getFieldValue match {
159+
case t: TStringColumn => t.getValues.size()
160+
case t: TDoubleColumn => t.getValues.size()
161+
case t: TI64Column => t.getValues.size()
162+
case t: TI32Column => t.getValues.size()
163+
case t: TI16Column => t.getValues.size()
164+
case t: TBoolColumn => t.getValues.size()
165+
case t: TByteColumn => t.getValues.size()
166+
case t: TBinaryColumn => t.getValues.size()
167+
case _ => 0
168+
}
169+
} else rowSet.getRowsSize
170+
171+
MetricsSystem.tracing(_.markMeter(
172+
if (fetchLog) MetricsConstants.BS_FETCH_LOG_ROWS_RATE
173+
else MetricsConstants.BS_FETCH_RESULT_ROWS_RATE,
174+
rowsSize))
175+
176+
rowSet
156177
}
157178
}
158179

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
136136
def this() = this(classOf[KyuubiServer].getSimpleName)
137137

138138
override val backendService: AbstractBackendService =
139-
new KyuubiBackendService() with BackendServiceTimeMetric
139+
new KyuubiBackendService() with BackendServiceMetric
140140

141141
override lazy val frontendServices: Seq[AbstractFrontendService] =
142142
conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map {

kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala renamed to kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceMetricSuite.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.kyuubi.config.KyuubiConf
2727
import org.apache.kyuubi.metrics.{MetricsConf, MetricsConstants}
2828
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2929

30-
class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
30+
class BackendServiceMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
3131

3232
override protected def jdbcUrl: String = getJdbcUrl
3333

@@ -39,35 +39,49 @@ class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHe
3939
.set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis)
4040
}
4141

42-
test("backend service method time metric test") {
42+
test("backend service metric test") {
4343
val objMapper = new ObjectMapper()
4444

4545
withJdbcStatement() { statement =>
46-
statement.execute("show databases")
46+
statement.executeQuery("CREATE TABLE stu_test(id int, name string) USING parquet")
47+
statement.execute("insert into stu_test values(1, 'a'), (2, 'b'), (3, 'c')")
4748
Thread.sleep(Duration.ofMillis(111).toMillis)
4849

4950
val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
5051
assert(res1.has("timers"))
51-
val histograms1 = res1.get("timers")
52+
val timer1 = res1.get("timers")
5253
assert(
53-
histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 1)
54+
timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("count").asInt() == 2)
5455
assert(
55-
histograms1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0)
56+
timer1.get(MetricsConstants.BS_EXECUTE_STATEMENT).get("mean").asDouble() > 0)
5657

57-
statement.execute("show tables")
58+
assert(res1.has("meters"))
59+
val meters1 = res1.get("meters")
60+
val logRows1 = meters1.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt()
61+
assert(logRows1 > 0)
62+
63+
statement.execute("select * from stu_test limit 2")
64+
statement.getResultSet.next()
5865
Thread.sleep(Duration.ofMillis(111).toMillis)
5966

6067
val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
61-
val histograms2 = res2.get("timers")
68+
val timer2 = res2.get("timers")
6269
assert(
63-
histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1)
70+
timer2.get(MetricsConstants.BS_OPEN_SESSION).get("count").asInt() == 1)
6471
assert(
65-
histograms2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0)
66-
val execStatementNode2 = histograms2.get(MetricsConstants.BS_EXECUTE_STATEMENT)
67-
assert(execStatementNode2.get("count").asInt() == 2)
72+
timer2.get(MetricsConstants.BS_OPEN_SESSION).get("min").asInt() > 0)
73+
val execStatementNode2 = timer2.get(MetricsConstants.BS_EXECUTE_STATEMENT)
74+
assert(execStatementNode2.get("count").asInt() == 3)
6875
assert(
6976
execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() &&
7077
execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble())
78+
79+
val meters2 =
80+
objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile).get("meters")
81+
assert(meters2.get(MetricsConstants.BS_FETCH_RESULT_ROWS_RATE).get("count").asInt() == 2)
82+
assert(meters2.get(MetricsConstants.BS_FETCH_LOG_ROWS_RATE).get("count").asInt() >= logRows1)
83+
84+
statement.executeQuery("DROP TABLE stu_test")
7185
}
7286
}
7387
}

0 commit comments

Comments
 (0)