From d4df31ced7dc8a2f1437e17122d2ba74766d7da5 Mon Sep 17 00:00:00 2001 From: Cheng Pan <379377944@qq.com> Date: Fri, 14 May 2021 17:56:19 +0800 Subject: [PATCH] [KYUUBI #637] [METRICS] Remove unnecessary Guava Cache ### _Why are the changes needed?_ `MetricRegistry` hold counters in `ConcurrentHashMap`, it's unnecessary to use Guava Cache. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #637 from pan3793/metrics. Closes #637 aee29f1 [Cheng Pan] remove synchronized a8d607b [Cheng Pan] [METRICS] Remove unnecessary Guava Cache Authored-by: Cheng Pan <379377944@qq.com> Signed-off-by: Kent Yao (cherry picked from commit d20a92e5ae766c729c4c5942958de3a4ad3699e0) Signed-off-by: Kent Yao --- .../org/apache/kyuubi/engine/EngineRef.scala | 9 ++--- .../kyuubi/operation/ExecuteStatement.scala | 6 +-- .../kyuubi/operation/KyuubiOperation.scala | 2 +- .../kyuubi/session/KyuubiSessionImpl.scala | 6 +-- .../kyuubi/session/KyuubiSessionManager.scala | 4 +- .../apache/kyuubi/metrics/MetricsSystem.scala | 40 +++++-------------- .../kyuubi/metrics/MetricsSystemSuite.scala | 4 +- 7 files changed, 24 insertions(+), 47 deletions(-) diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 13c6af7ef0d..463b4e2ddac 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -138,7 +138,7 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI") conf.set(HA_ZK_NAMESPACE, engineSpace) val builder = new SparkProcessBuilder(appUser, conf) - MetricsSystem.tracing(_.incAndGetCount(ENGINE_TOTAL)) + MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) try { info(s"Launching engine:\n$builder") val process = builder.start @@ -150,16 +150,15 @@ private[kyuubi] class EngineRef private(conf: KyuubiConf, user: String, sessionI if (exitValue.get != 0) { val error = builder.getError MetricsSystem.tracing { ms => - ms.incAndGetCount(MetricRegistry.name(ENGINE_FAIL, appUser)) - ms.incAndGetCount( - MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) + ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser)) + ms.incCount(MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) } throw error } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly() - MetricsSystem.tracing(_.incAndGetCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) + MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( s"Timeout($timeout) to launched Spark with $builder", builder.getError) diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 0a4700cbbb3..0f002d5d4c1 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -66,8 +66,8 @@ class ExecuteStatement( private def executeStatement(): Unit = { try { MetricsSystem.tracing { ms => - ms.incAndGetCount(STATEMENT_OPEN) - ms.incAndGetCount(STATEMENT_TOTAL) + ms.incCount(STATEMENT_OPEN) + ms.incCount(STATEMENT_TOTAL) } val req = new TExecuteStatementReq(remoteSessionHandle, statement) @@ -154,7 +154,7 @@ class ExecuteStatement( } override def close(): Unit = { - MetricsSystem.tracing(_.decAndGetCount(STATEMENT_OPEN)) + MetricsSystem.tracing(_.decCount(STATEMENT_OPEN)) super.close() } } diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index ebe59fc5b46..37c269ec0f3 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -52,7 +52,7 @@ abstract class KyuubiOperation( } else { val errorType = e.getClass.getSimpleName MetricsSystem.tracing { - _.incAndGetCount(MetricRegistry.name(STATEMENT_FAIL, errorType)) + _.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType)) } setState(OperationState.ERROR) val ke = e match { diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 14be45d2cd0..b2ece0a651d 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -58,8 +58,8 @@ class KyuubiSessionImpl( override def open(): Unit = { MetricsSystem.tracing { ms => - ms.incAndGetCount(CONN_TOTAL) - ms.incAndGetCount(MetricRegistry.name(CONN_OPEN, user)) + ms.incCount(CONN_TOTAL) + ms.incCount(MetricRegistry.name(CONN_OPEN, user)) } super.open() withZkClient(sessionConf) { zkClient => @@ -104,7 +104,7 @@ class KyuubiSessionImpl( case e: TException => throw KyuubiSQLException("Error while cleaning up the engine resources", e) } finally { - MetricsSystem.tracing(_.decAndGetCount(MetricRegistry.name(CONN_OPEN, user))) + MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user))) client = null if (transport != null) { transport.close() diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index f68ed6ef680..93030f0bf36 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -65,8 +65,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { } catch { case e: Throwable => MetricsSystem.tracing { ms => - ms.incAndGetCount(CONN_FAIL) - ms.incAndGetCount(MetricRegistry.name(CONN_FAIL, user)) + ms.incCount(CONN_FAIL) + ms.incCount(MetricRegistry.name(CONN_FAIL, user)) } try { sessionImpl.close() diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala index 81186392a96..ad66b22274a 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala @@ -18,11 +18,9 @@ package org.apache.kyuubi.metrics import java.lang.management.ManagementFactory -import java.util.concurrent.locks.ReentrantLock -import com.codahale.metrics.{Counter, Gauge, MetricRegistry} -import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet, ThreadStatesGaugeSet} -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.codahale.metrics.{Gauge, MetricRegistry} +import com.codahale.metrics.jvm._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.metrics.MetricsConf.METRICS_REPORTERS @@ -33,29 +31,15 @@ import org.apache.kyuubi.service.CompositeService class MetricsSystem extends CompositeService("MetricsSystem") { private val registry = new MetricRegistry - private var counters: LoadingCache[String, Counter] = _ - private val countersLock = new ReentrantLock - - def incAndGetCount(key: String): Long = { - try { - countersLock.lock() - val counter = counters.get(key) - counter.inc(1L) - counter.getCount - } finally { - countersLock.unlock() - } + + def incCount(key: String): Unit = { + val counter = registry.counter(key) + counter.inc(1L) } - def decAndGetCount(key: String): Long = { - try { - countersLock.lock() - val counter = counters.get(key) - counter.dec(1L) - counter.getCount - } finally { - countersLock.unlock() - } + def decCount(key: String): Unit = { + val counter = registry.counter(key) + counter.dec(1L) } def registerGauge[T](name: String, value: => T, default: T): Unit = { @@ -70,12 +54,6 @@ class MetricsSystem extends CompositeService("MetricsSystem") { registry.registerAll(new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer)) registry.registerAll(new ThreadStatesGaugeSet) - counters = CacheBuilder.newBuilder().build[String, Counter]( - new CacheLoader[String, Counter] { - override def load(key: String): Counter = registry.counter(key) - } - ) - conf.get(METRICS_REPORTERS).map(ReporterType.withName).foreach { case JSON => addService(new JsonReporterService(registry)) case SLF4J => addService(new Slf4jReporterService(registry)) diff --git a/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala b/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala index a57299eac06..dac09cd796c 100644 --- a/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala +++ b/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala @@ -84,10 +84,10 @@ class MetricsSystemSuite extends KyuubiFunSuite { metricsSystem.start() val reportFile = Paths.get(reportPath.toString, "report.json") checkJsonFileMetrics(reportFile, "PS-MarkSweep.count") - metricsSystem.incAndGetCount(MetricsConstants.STATEMENT_TOTAL) + metricsSystem.incCount(MetricsConstants.STATEMENT_TOTAL) checkJsonFileMetrics(reportFile, MetricsConstants.STATEMENT_TOTAL) - metricsSystem.decAndGetCount(MetricsConstants.STATEMENT_TOTAL) + metricsSystem.decCount(MetricsConstants.STATEMENT_TOTAL) metricsSystem.registerGauge(MetricsConstants.CONN_OPEN, 20181117, 0) checkJsonFileMetrics(reportFile, MetricsConstants.CONN_OPEN) checkJsonFileMetrics(reportFile, "20181117")