diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index cfa7845275b..6b2647b88b9 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -70,6 +70,14 @@ kamon { prometheus { # We expose the metrics endpoint over akka http. So default server is disabled start-embedded-http-server = no + + buckets { + custom { + //By default retry are configured upto 9. However for certain setups we may increase + //it to higher values + "histogram.cosmosdb_retry_success" = [1, 2, 3, 5, 7, 10, 12, 15, 20] + } + } } reporters = [ @@ -244,6 +252,10 @@ whisk { # and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency record-usage-frequency = 10 m + # Flag to enable collection of retry stats. This feature works by registering with Logback to intercept + # log messages and based on that collect stats + retry-stats-enabled = true + connection-policy { max-pool-size = 1000 # When the value of this property is true, the SDK will direct write operations to @@ -260,6 +272,8 @@ whisk { retry-options { # Sets the maximum number of retries in the case where the request fails # because the service has applied rate limiting on the client. + + # If this value is changed then adjust the buckets under `kamon.prometehus` max-retry-attempts-on-throttled-requests = 9 # Sets the maximum retry time diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 1ddc8b8ff3a..1ff638f130c 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb import _root_.rx.RxReactiveStreams import akka.actor.ActorSystem +import akka.event.Logging.InfoLevel import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} @@ -71,6 +72,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val getToken = createToken("get") private val queryToken = createToken("query") private val countToken = createToken("count") + private val docSizeToken = createDocSizeToken() private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes) private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes) @@ -99,7 +101,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { val asJson = d.toDocumentRecord - val doc = toCosmosDoc(asJson) + val (doc, docSize) = toCosmosDoc(asJson) val id = doc.getId val docinfoStr = s"id: $id, rev: ${doc.getETag}" val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'") @@ -136,7 +138,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected } .transform( { r => - transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'") + docSizeToken.histogram.record(docSize) + transid.finished( + this, + start, + s"[PUT] '$collName' completed document: '$docinfoStr', size=$docSize, ru=${r.getRequestCharge}${extraLogs(r)}", + InfoLevel) collectMetrics(putToken, r.getRequestCharge) toDocInfo(r.getResource) }, { @@ -158,8 +165,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected } val g = f .transform( - { _ => - transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'") + { r => + transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'${extraLogs(r)}", InfoLevel) true }, { case e: DocumentClientException if isNotFound(e) => @@ -194,7 +201,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: TransactionId) = { val deletedJs = transform(js, Seq((deleted, Some(JsTrue)))) - val doc = toCosmosDoc(deletedJs) + val (doc, _) = toCosmosDoc(deletedJs) softDeleteTTL.foreach(doc.setTimeToLive(_)) val f = client.replaceDocument(doc, matchRevOption(docInfo)).head() f.foreach(r => collectMetrics(putToken, r.getRequestCharge)) @@ -220,8 +227,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected // for compatibility throw NoDocumentException("not found on 'get'") } else { - val js = getResultToWhiskJsonDoc(rr.getResource) - transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'") + val (js, docSize) = getResultToWhiskJsonDoc(rr.getResource) + transid + .finished( + this, + start, + s"[GET] '$collName' completed: found document '$doc',size=$docSize, ru=${rr.getRequestCharge}${extraLogs(rr)}", + InfoLevel) deserialize[A, DocumentAbstraction](doc, js) } }, { @@ -254,7 +266,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found") None } else { - val js = getResultToWhiskJsonDoc(rr.getResource) + val (js, _) = getResultToWhiskJsonDoc(rr.getResource) transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'") Some(js) } @@ -292,7 +304,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .readDocument(selfLinkOf(id), newRequestOption(id)) .head() .map { rr => - val js = getResultToWhiskJsonDoc(rr.getResource) + val (js, _) = getResultToWhiskJsonDoc(rr.getResource) collectMetrics(getToken, rr.getRequestCharge) js } @@ -358,7 +370,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected this, s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") } - transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}") + transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel) } reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'") } @@ -465,7 +477,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue } - private def toCosmosDoc(json: JsObject): Document = { + private def toCosmosDoc(json: JsObject): (Document, Int) = { val computedJs = documentHandler.computedFields(json) val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None val fieldsToAdd = @@ -476,10 +488,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected (clusterId, clusterIdValue)) val fieldsToRemove = Seq(_id, _rev) val mapped = transform(json, fieldsToAdd, fieldsToRemove) - val doc = new Document(mapped.compactPrint) + val jsonString = mapped.compactPrint + val doc = new Document(jsonString) doc.set(selfLink, createSelfLink(doc.getId)) doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision - doc + (doc, jsonString.length) } private def queryResultToWhiskJsonDoc(doc: Document): JsObject = { @@ -490,10 +503,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected toWhiskJsonDoc(js, id, None) } - private def getResultToWhiskJsonDoc(doc: Document): JsObject = { + private def getResultToWhiskJsonDoc(doc: Document): (JsObject, Int) = { checkDoc(doc) - val js = doc.toJson.parseJson.asJsObject - toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag))) + val jsString = doc.toJson + val js = jsString.parseJson.asJsObject + val whiskDoc = toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag))) + (whiskDoc, jsString.length) } private def toDocInfo[T <: Resource](doc: T) = { @@ -552,9 +567,23 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected else LogMarkerToken("cosmosdb", name, collName)(unit) } + private def createDocSizeToken(): LogMarkerToken = { + val unit = MeasurementUnit.information.bytes + val name = "doc" + val tags = Map("collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "size", tags = tags)(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue) private def isNewDocument(doc: Document) = doc.getETag == null + + private def extraLogs(r: ResourceResponse[_])(implicit tid: TransactionId): String = { + if (tid.meta.extraLogging) { + " " + r.getRequestDiagnosticsString + } else "" + } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala index 72f9f1029da..7d3ac3ee338 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala @@ -41,6 +41,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider { type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]() + RetryMetricsCollector.registerIfEnabled() + override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)( implicit jsonFormat: RootJsonFormat[D], docReader: DocumentReader, diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala new file mode 100644 index 00000000000..d5f3d4de1da --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/RetryMetricsCollector.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.event.slf4j.SLF4JLogging +import ch.qos.logback.classic.LoggerContext +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.AppenderBase +import com.microsoft.azure.cosmosdb.rx.internal.ResourceThrottleRetryPolicy +import org.apache.openwhisk.common.{Counter => WhiskCounter} +import kamon.metric.{Counter, MeasurementUnit} +import org.apache.openwhisk.common.{LogMarkerToken, TransactionId} +import org.apache.openwhisk.core.ConfigKeys +import org.slf4j.LoggerFactory +import pureconfig._ + +import scala.util.Try + +object CosmosDBAction extends Enumeration { + val Create, Query, Get, Others = Value +} + +object RetryMetricsCollector extends AppenderBase[ILoggingEvent] with SLF4JLogging { + import CosmosDBAction._ + private val tokens = + Map(Create -> Token(Create), Query -> Token(Query), Get -> Token(Get), Others -> Token(Others)) + + val retryCounter = new WhiskCounter + private[cosmosdb] def registerIfEnabled(): Unit = { + val enabled = loadConfigOrThrow[Boolean](s"${ConfigKeys.cosmosdb}.retry-stats-enabled") + if (enabled) { + log.info("Enabling retry metrics collector") + register() + } + } + + /** + * CosmosDB uses below log message + * ``` + * logger.warn( + * "Operation will be retried after {} milliseconds. Current attempt {}, Cumulative delay {}", + * retryDelay.toMillis(), + * this.currentAttemptCount, + * this.cumulativeRetryDelay, + * exception); + * ``` + * + */ + override def append(e: ILoggingEvent): Unit = { + val msg = e.getMessage + val errorMsg = Option(e.getThrowableProxy).map(_.getMessage).getOrElse(msg) + for { + success <- isSuccessOrFailedRetry(msg) + token <- tokens.get(operationType(errorMsg)) + } { + if (success) { + token.success.counter.increment() + //Element 1 has the count + val attemptCount = getRetryAttempt(e.getArgumentArray, 1) + token.success.histogram.record(attemptCount) + + //Used mostly for test mode where tags may be disabled + //and test need to determine if count is increased + if (!TransactionId.metricsKamonTags) { + retryCounter.next() + } + } else { + token.failed.counter.increment() + } + } + } + + def getCounter(opType: CosmosDBAction.Value, retryPassed: Boolean = true): Option[Counter] = { + tokens.get(opType).map(t => if (retryPassed) t.success else t.failed).map { _.counter } + } + + private def getRetryAttempt(args: Array[AnyRef], index: Int) = { + val t = Try { + if (args != null & args.length > index) { + args(index) match { + case n: Number => n.intValue() + case _ => 0 + } + } else 0 + } + t.getOrElse(0) + } + + private def register(): Unit = { + val logCtx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] + val retryLogger = logCtx.getLogger(classOf[ResourceThrottleRetryPolicy].getName) + start() + retryLogger.addAppender(this) + } + + private def isSuccessOrFailedRetry(msg: String) = { + if (msg.startsWith("Operation will be retried after")) Some(true) + else if (msg.startsWith("Operation will NOT be retried")) Some(false) + else None + } + + private def operationType(errorMsg: String) = { + if (errorMsg.contains("OperationType: Query")) Query + else if (errorMsg.contains("OperationType: Create")) Create + else if (errorMsg.contains("OperationType: Get")) Get + else Others + } + + private def createToken(opType: String, retryPassed: Boolean): LogMarkerToken = { + val action = if (retryPassed) "success" else "failed" + val tags = Map("type" -> opType) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "retry", action, tags = tags)(MeasurementUnit.none) + else LogMarkerToken("cosmosdb", "retry", action, Some(opType))(MeasurementUnit.none) + } + + private case class Token(success: LogMarkerToken, failed: LogMarkerToken) + + private object Token { + def apply(opType: CosmosDBAction.Value): Token = + new Token(createToken(opType.toString, retryPassed = true), createToken(opType.toString, retryPassed = false)) + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala index d05c6072717..0cd5f434212 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala @@ -17,9 +17,13 @@ package org.apache.openwhisk.core.database.cosmosdb +import java.util.concurrent.CountDownLatch + +import akka.stream.scaladsl.Source import com.typesafe.config.ConfigFactory import io.netty.util.ResourceLeakDetector import io.netty.util.ResourceLeakDetector.Level +import kamon.metric.LongAdderCounter import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.database.DocumentSerializer import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider @@ -28,6 +32,7 @@ import org.apache.openwhisk.core.entity.WhiskQueries.TOP import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.entity.{ DocumentReader, + Parameters, WhiskActivation, WhiskDocumentReader, WhiskEntity, @@ -35,14 +40,11 @@ import org.apache.openwhisk.core.entity.{ WhiskPackage } import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.FlatSpec -import spray.json.JsString -import org.apache.openwhisk.core.entity.size._ -import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import spray.json.JsString +import scala.concurrent.duration._ import scala.reflect.ClassTag @RunWith(classOf[JUnitRunner]) @@ -154,4 +156,41 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase stream.toString should include("[QueryMetricsEnabled]") } + + behavior of "CosmosDB retry metrics" + + it should "capture success retries" in { + implicit val tid: TransactionId = TransactionId.testing + val bigPkg = WhiskPackage(newNS(), aname(), parameters = Parameters("foo", "x" * 1024 * 1024)) + val latch = new CountDownLatch(1) + val f = Source(1 to 500) + .mapAsync(100) { i => + latch.countDown() + if (i % 5 == 0) println(i) + require(retryCount == 0) + entityStore.put(bigPkg) + } + .runForeach { doc => + docsToDelete += ((entityStore, doc)) + } + + //Wait for one save operation before checking for stats + latch.await() + retry(() => f, 500.millis) + retryCount should be > 0 + } + + private def retryCount: Int = { + //If KamonTags are disabled then Kamon uses CounterMetricImpl which does not provide + //any way of determining the current count. So in those cases the retry collector + //would increment a counter + if (TransactionId.metricsKamonTags) { + RetryMetricsCollector.getCounter(CosmosDBAction.Create) match { + case Some(x: LongAdderCounter) => x.snapshot(false).value.toInt + case _ => 0 + } + } else { + RetryMetricsCollector.retryCounter.cur.toInt + } + } }