Skip to content

Commit

Permalink
Merge commit '38eb74c26ebbdbb57aba51ad18c7e4a6bb9e1144' into cache-do…
Browse files Browse the repository at this point in the history
…wnloaded-archives
  • Loading branch information
bowenliang123 committed Feb 17, 2023
2 parents 0cf2a75 + 38eb74c commit d253a49
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/monitor/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ These metrics include:
|--------------------------------------------------|----------------------------------------|-----------|-------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kyuubi.exec.pool.threads.alive` | | gauge | 1.2.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> threads keepAlive in the backend executive thread pool</div> |
| `kyuubi.exec.pool.threads.active` | | gauge | 1.2.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> threads active in the backend executive thread pool</div> |
| `kyuubi.exec.pool.work_queue.size` | | gauge | 1.7.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> work queue size in the backend executive thread pool</div> |
| `kyuubi.connection.total` | | counter | 1.2.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative connection count</div> |
| `kyuubi.connection.total` | `${sessionType}` | counter | 1.7.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> cumulative connection count with session type `${sessionType}`</div> |
| `kyuubi.connection.opened` | | gauge | 1.2.0 | <div style='width: 150pt;word-wrap: break-word;white-space: normal'> current active connection count</div> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
<strong>Background execution pool threads active: </strong>
{engine.backendService.sessionManager.getActiveCount}
</li>
<li>
<strong>Background execution pool work queue size: </strong>
{engine.backendService.sessionManager.getWorkQueueSize}
</li>
}.getOrElse(Seq.empty)
}
</ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
setLayout(lo)

addFilter { _: LoggingEvent =>
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
}

/**
Expand All @@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.operation.log

import java.io.CharArrayWriter
import java.util.concurrent.locks.ReadWriteLock

import scala.collection.JavaConverters._

Expand All @@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout

import org.apache.kyuubi.reflection.DynFields

class Log4j2DivertAppender(
name: String,
layout: StringLayout,
Expand All @@ -52,34 +55,35 @@ class Log4j2DivertAppender(

addFilter(new AbstractFilter() {
override def filter(event: LogEvent): Filter.Result = {
if (OperationLog.getCurrentOperationLog == null) {
Filter.Result.DENY
} else {
if (OperationLog.getCurrentOperationLog.isDefined) {
Filter.Result.NEUTRAL
} else {
Filter.Result.DENY
}
}
})

def initLayout(): StringLayout = {
LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
.getAppenders.values().asScala
.find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
.map(_.getLayout.asInstanceOf[StringLayout])
.getOrElse(PatternLayout.newBuilder().withPattern(
"%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
}
private val writeLock = DynFields.builder()
.hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
.build[ReadWriteLock](this)
.get()
.writeLock

/**
* Overrides AbstractWriterAppender.append(), which does the real logging. No need
* to worry about concurrency since log4j calls this synchronously.
*/
override def append(event: LogEvent): Unit = {
super.append(event)
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
writeLock.lock()
try {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
} finally {
writeLock.unlock()
}
}
}

Expand All @@ -95,7 +99,7 @@ object Log4j2DivertAppender {

def initialize(): Unit = {
val ap = new Log4j2DivertAppender()
org.apache.logging.log4j.LogManager.getRootLogger()
org.apache.logging.log4j.LogManager.getRootLogger
.asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
ap.start()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object OperationLog extends Logging {
OPERATION_LOG.set(operationLog)
}

def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)

def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
execPool.getActiveCount
}

def getWorkQueueSize: Int = {
assert(execPool != null)
execPool.getQueue.size()
}

private var _confRestrictList: Set[String] = _
private var _confIgnoreList: Set[String] = _
private var _batchConfIgnoreList: Set[String] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
assert(!Files.exists(logFile))

OperationLog.setCurrentOperationLog(operationLog)
assert(OperationLog.getCurrentOperationLog === operationLog)
assert(OperationLog.getCurrentOperationLog === Some(operationLog))

OperationLog.removeCurrentOperationLog()
assert(OperationLog.getCurrentOperationLog === null)
assert(OperationLog.getCurrentOperationLog.isEmpty)

operationLog.write(msg1 + "\n")
assert(Files.exists(logFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object MetricsConstants {

final val EXEC_POOL_ALIVE: String = KYUUBI + "exec.pool.threads.alive"
final val EXEC_POOL_ACTIVE: String = KYUUBI + "exec.pool.threads.active"
final val EXEC_POOL_WORK_QUEUE_SIZE: String = KYUUBI + "exec.pool.work_queue.size"

final private val CONN = KYUUBI + "connection."
final private val THRIFT_HTTP_CONN = KYUUBI + "thrift.http.connection."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
public class ExecPoolStatistic {
private int execPoolSize;
private int execPoolActiveCount;
private int execPoolWorkQueueSize;

public ExecPoolStatistic() {}

public ExecPoolStatistic(int execPoolSize, int execPoolActiveCount) {
public ExecPoolStatistic(int execPoolSize, int execPoolActiveCount, int execPoolWorkQueueSize) {
this.execPoolSize = execPoolSize;
this.execPoolActiveCount = execPoolActiveCount;
this.execPoolWorkQueueSize = execPoolWorkQueueSize;
}

public int getExecPoolSize() {
Expand All @@ -48,18 +50,27 @@ public void setExecPoolActiveCount(int execPoolActiveCount) {
this.execPoolActiveCount = execPoolActiveCount;
}

public int getExecPoolWorkQueueSize() {
return execPoolWorkQueueSize;
}

public void setExecPoolWorkQueueSize(int execPoolWorkQueueSize) {
this.execPoolWorkQueueSize = execPoolWorkQueueSize;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecPoolStatistic that = (ExecPoolStatistic) o;
return getExecPoolSize() == that.getExecPoolSize()
&& getExecPoolActiveCount() == that.getExecPoolActiveCount();
&& getExecPoolActiveCount() == that.getExecPoolActiveCount()
&& getExecPoolWorkQueueSize() == that.getExecPoolWorkQueueSize();
}

@Override
public int hashCode() {
return Objects.hash(getExecPoolSize(), getExecPoolActiveCount());
return Objects.hash(getExecPoolSize(), getExecPoolActiveCount(), getExecPoolWorkQueueSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.kyuubi.server.api.v1

import java.io.InputStream
import java.util.Locale
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
Expand Down Expand Up @@ -318,12 +319,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
try {
val submissionOp = batchSession.batchJobSubmissionOp
val rowSet = submissionOp.getOperationLogRowSet(
FetchOrientation.FETCH_NEXT,
from,
size)
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
val columns = rowSet.getColumns
val logRowSet: util.List[String] =
if (columns == null || columns.size == 0) {
Collections.emptyList()
} else {
assert(columns.size == 1)
columns.get(0).getStringVal.getValues
}
new OperationLog(logRowSet, logRowSet.size)
} catch {
case NonFatal(e) =>
val errorMsg = s"Error getting operation log for batchId: $batchId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
def execPoolStatistic(): ExecPoolStatistic = {
new ExecPoolStatistic(
sessionManager.getExecPoolSize,
sessionManager.getActiveCount)
sessionManager.getActiveCount,
sessionManager.getWorkQueueSize)
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
}
super.start()
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
<jars.target.dir>${project.build.directory}/scala-${scala.binary.version}/jars</jars.target.dir>

<maven.plugin.build.helper.version>3.3.0</maven.plugin.build.helper.version>
<maven.plugin.download.version>1.6.6</maven.plugin.download.version>
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
<maven.plugin.download.cache.path></maven.plugin.download.cache.path>
<maven.plugin.enforcer.mojo.rules.version>1.6.1</maven.plugin.enforcer.mojo.rules.version>
<maven.plugin.scala.version>4.8.0</maven.plugin.scala.version>
Expand Down

0 comments on commit d253a49

Please sign in to comment.