Skip to content

Commit

Permalink
[KYUUBI #637] [METRICS] Remove unnecessary Guava Cache
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
`MetricRegistry` hold counters in `ConcurrentHashMap`, it's unnecessary to use Guava Cache.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #637 from pan3793/metrics.

Closes #637

aee29f1 [Cheng Pan] remove synchronized
a8d607b [Cheng Pan] [METRICS] Remove unnecessary Guava Cache

Authored-by: Cheng Pan <379377944@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit d20a92e)
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
pan3793 authored and yaooqinn committed May 14, 2021
1 parent cb1b5d8 commit d4df31c
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
val builder = new SparkProcessBuilder(appUser, conf)
MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL))
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
info(s"Launching engine:\n$builder")
val process = builder.start
Expand All @@ -150,16 +150,15 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI
if (exitValue.get != 0) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incAndGetCount(
MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
}
throw error
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout) to launched Spark with $builder",
builder.getError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
MetricsSystem.tracing { ms =>
ms.incAndGetCount(STATEMENT_OPEN)
ms.incAndGetCount(STATEMENT_TOTAL)
ms.incCount(STATEMENT_OPEN)
ms.incCount(STATEMENT_TOTAL)
}

val req = new TExecuteStatementReq(remoteSessionHandle, statement)
Expand Down Expand Up @@ -154,7 +154,7 @@ class ExecuteStatement(
}

override def close(): Unit = {
MetricsSystem.tracing(_.decAndGetCount(STATEMENT_OPEN))
MetricsSystem.tracing(_.decCount(STATEMENT_OPEN))
super.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ abstract class KyuubiOperation(
} else {
val errorType = e.getClass.getSimpleName
MetricsSystem.tracing {
_.incAndGetCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
_.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
}
setState(OperationState.ERROR)
val ke = e match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class KyuubiSessionImpl(

override def open(): Unit = {
MetricsSystem.tracing { ms =>
ms.incAndGetCount(CONN_TOTAL)
ms.incAndGetCount(MetricRegistry.name(CONN_OPEN, user))
ms.incCount(CONN_TOTAL)
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
}
super.open()
withZkClient(sessionConf) { zkClient =>
Expand Down Expand Up @@ -104,7 +104,7 @@ class KyuubiSessionImpl(
case e: TException =>
throw KyuubiSQLException("Error while cleaning up the engine resources", e)
} finally {
MetricsSystem.tracing(_.decAndGetCount(MetricRegistry.name(CONN_OPEN, user)))
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
client = null
if (transport != null) {
transport.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
} catch {
case e: Throwable =>
MetricsSystem.tracing { ms =>
ms.incAndGetCount(CONN_FAIL)
ms.incAndGetCount(MetricRegistry.name(CONN_FAIL, user))
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
}
try {
sessionImpl.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.kyuubi.metrics

import java.lang.management.ManagementFactory
import java.util.concurrent.locks.ReentrantLock

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet, ThreadStatesGaugeSet}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.codahale.metrics.{Gauge, MetricRegistry}
import com.codahale.metrics.jvm._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.metrics.MetricsConf.METRICS_REPORTERS
Expand All @@ -33,29 +31,15 @@ import org.apache.kyuubi.service.CompositeService
class MetricsSystem extends CompositeService("MetricsSystem") {

private val registry = new MetricRegistry
private var counters: LoadingCache[String, Counter] = _
private val countersLock = new ReentrantLock

def incAndGetCount(key: String): Long = {
try {
countersLock.lock()
val counter = counters.get(key)
counter.inc(1L)
counter.getCount
} finally {
countersLock.unlock()
}

def incCount(key: String): Unit = {
val counter = registry.counter(key)
counter.inc(1L)
}

def decAndGetCount(key: String): Long = {
try {
countersLock.lock()
val counter = counters.get(key)
counter.dec(1L)
counter.getCount
} finally {
countersLock.unlock()
}
def decCount(key: String): Unit = {
val counter = registry.counter(key)
counter.dec(1L)
}

def registerGauge[T](name: String, value: => T, default: T): Unit = {
Expand All @@ -70,12 +54,6 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
registry.registerAll(new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
registry.registerAll(new ThreadStatesGaugeSet)

counters = CacheBuilder.newBuilder().build[String, Counter](
new CacheLoader[String, Counter] {
override def load(key: String): Counter = registry.counter(key)
}
)

conf.get(METRICS_REPORTERS).map(ReporterType.withName).foreach {
case JSON => addService(new JsonReporterService(registry))
case SLF4J => addService(new Slf4jReporterService(registry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ class MetricsSystemSuite extends KyuubiFunSuite {
metricsSystem.start()
val reportFile = Paths.get(reportPath.toString, "report.json")
checkJsonFileMetrics(reportFile, "PS-MarkSweep.count")
metricsSystem.incAndGetCount(MetricsConstants.STATEMENT_TOTAL)
metricsSystem.incCount(MetricsConstants.STATEMENT_TOTAL)

checkJsonFileMetrics(reportFile, MetricsConstants.STATEMENT_TOTAL)
metricsSystem.decAndGetCount(MetricsConstants.STATEMENT_TOTAL)
metricsSystem.decCount(MetricsConstants.STATEMENT_TOTAL)
metricsSystem.registerGauge(MetricsConstants.CONN_OPEN, 20181117, 0)
checkJsonFileMetrics(reportFile, MetricsConstants.CONN_OPEN)
checkJsonFileMetrics(reportFile, "20181117")
Expand Down

0 comments on commit d4df31c

Please sign in to comment.