Skip to content

Commit

Permalink
Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
Browse files Browse the repository at this point in the history
* Add Scheduler Queue Metric for Not Processing Any Activations

* fix timeout comparison

* account for action timeout being longer than queue retention

---------

Co-authored-by: Brendan Doyle <brendand@qualtrics.com>
  • Loading branch information
bdoyle0182 and Brendan Doyle committed Mar 7, 2023
1 parent 96ff327 commit 60ca660
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,13 @@ object LoggingMarkers {
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) =
LogMarkerToken(
scheduler,
"queueNotProcessing",
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)

/*
* General markers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._
import scala.collection.JavaConverters._

import scala.collection.JavaConverters._
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
Expand Down Expand Up @@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation: (Clock,
Queue[TimeSeriesActivationEntry],
Long,
AtomicLong,
String,
WhiskActionMetaData,
MemoryQueueState,
Expand Down Expand Up @@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,

private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
private[queue] var in = new AtomicInteger(0)
private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli)
private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
private[queue] var averageDuration: Option[Double] = None
private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize)
Expand Down Expand Up @@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, clock.now())
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
.compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
Expand Down Expand Up @@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
clock,
queue,
actionRetentionTimeout,
lastActivationPulledTime,
invocationNamespace,
actionMetaData,
stateName,
Expand Down Expand Up @@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
lastActivationPulledTime.set(Instant.now.toEpochMilli)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
Expand All @@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
lastActivationPulledTime.set(Instant.now.toEpochMilli)

sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
Expand Down Expand Up @@ -1186,6 +1191,7 @@ object MemoryQueue {
def checkToDropStaleActivation(clock: Clock,
queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
lastActivationExecutedTime: AtomicLong,
invocationNamespace: String,
actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
Expand All @@ -1201,6 +1207,13 @@ object MemoryQueue {
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.")
val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get()
if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
MetricEmitter.emitGaugeMetric(
LoggingMarkers
.SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion),
1)
}

queueRef ! DropOld
}
Expand Down

0 comments on commit 60ca660

Please sign in to comment.