Skip to content

Commit

Permalink
Capture document size, RU usage, retry stats for get and put in Cosmo…
Browse files Browse the repository at this point in the history
…sDB (apache#4652)

* Enable collection of retry stats
* Log document size and RU used for get and put operation
* Include extra logs for transaction with debug mode enabled
* Histogram metric for document size
* Add custom histogram buckets for retry stats
  • Loading branch information
chetanmeh committed Oct 1, 2019
1 parent ef1ec9e commit 837bca3
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 21 deletions.
14 changes: 14 additions & 0 deletions common/scala/src/main/resources/application.conf
Expand Up @@ -70,6 +70,14 @@ kamon {
prometheus {
# We expose the metrics endpoint over akka http. So default server is disabled
start-embedded-http-server = no

buckets {
custom {
//By default retry are configured upto 9. However for certain setups we may increase
//it to higher values
"histogram.cosmosdb_retry_success" = [1, 2, 3, 5, 7, 10, 12, 15, 20]
}
}
}

reporters = [
Expand Down Expand Up @@ -244,6 +252,10 @@ whisk {
# and exposed as metrics. If any reindexing is in progress then its progress would be logged with this frequency
record-usage-frequency = 10 m

# Flag to enable collection of retry stats. This feature works by registering with Logback to intercept
# log messages and based on that collect stats
retry-stats-enabled = true

connection-policy {
max-pool-size = 1000
# When the value of this property is true, the SDK will direct write operations to
Expand All @@ -260,6 +272,8 @@ whisk {
retry-options {
# Sets the maximum number of retries in the case where the request fails
# because the service has applied rate limiting on the client.

# If this value is changed then adjust the buckets under `kamon.prometehus`
max-retry-attempts-on-throttled-requests = 9

# Sets the maximum retry time
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb

import _root_.rx.RxReactiveStreams
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
Expand Down Expand Up @@ -71,6 +72,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private val getToken = createToken("get")
private val queryToken = createToken("query")
private val countToken = createToken("count")
private val docSizeToken = createDocSizeToken()

private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes)
private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes)
Expand Down Expand Up @@ -99,7 +101,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
val asJson = d.toDocumentRecord

val doc = toCosmosDoc(asJson)
val (doc, docSize) = toCosmosDoc(asJson)
val id = doc.getId
val docinfoStr = s"id: $id, rev: ${doc.getETag}"
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'")
Expand Down Expand Up @@ -136,7 +138,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
}
.transform(
{ r =>
transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'")
docSizeToken.histogram.record(docSize)
transid.finished(
this,
start,
s"[PUT] '$collName' completed document: '$docinfoStr', size=$docSize, ru=${r.getRequestCharge}${extraLogs(r)}",
InfoLevel)
collectMetrics(putToken, r.getRequestCharge)
toDocInfo(r.getResource)
}, {
Expand All @@ -158,8 +165,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
}
val g = f
.transform(
{ _ =>
transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'")
{ r =>
transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'${extraLogs(r)}", InfoLevel)
true
}, {
case e: DocumentClientException if isNotFound(e) =>
Expand Down Expand Up @@ -194,7 +201,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected

private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: TransactionId) = {
val deletedJs = transform(js, Seq((deleted, Some(JsTrue))))
val doc = toCosmosDoc(deletedJs)
val (doc, _) = toCosmosDoc(deletedJs)
softDeleteTTL.foreach(doc.setTimeToLive(_))
val f = client.replaceDocument(doc, matchRevOption(docInfo)).head()
f.foreach(r => collectMetrics(putToken, r.getRequestCharge))
Expand All @@ -220,8 +227,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
// for compatibility
throw NoDocumentException("not found on 'get'")
} else {
val js = getResultToWhiskJsonDoc(rr.getResource)
transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'")
val (js, docSize) = getResultToWhiskJsonDoc(rr.getResource)
transid
.finished(
this,
start,
s"[GET] '$collName' completed: found document '$doc',size=$docSize, ru=${rr.getRequestCharge}${extraLogs(rr)}",
InfoLevel)
deserialize[A, DocumentAbstraction](doc, js)
}
}, {
Expand Down Expand Up @@ -254,7 +266,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found")
None
} else {
val js = getResultToWhiskJsonDoc(rr.getResource)
val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'")
Some(js)
}
Expand Down Expand Up @@ -292,7 +304,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.readDocument(selfLinkOf(id), newRequestOption(id))
.head()
.map { rr =>
val js = getResultToWhiskJsonDoc(rr.getResource)
val (js, _) = getResultToWhiskJsonDoc(rr.getResource)
collectMetrics(getToken, rr.getRequestCharge)
js
}
Expand Down Expand Up @@ -358,7 +370,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
this,
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
}
transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}")
transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
}
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
}
Expand Down Expand Up @@ -465,7 +477,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue
}

private def toCosmosDoc(json: JsObject): Document = {
private def toCosmosDoc(json: JsObject): (Document, Int) = {
val computedJs = documentHandler.computedFields(json)
val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None
val fieldsToAdd =
Expand All @@ -476,10 +488,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
(clusterId, clusterIdValue))
val fieldsToRemove = Seq(_id, _rev)
val mapped = transform(json, fieldsToAdd, fieldsToRemove)
val doc = new Document(mapped.compactPrint)
val jsonString = mapped.compactPrint
val doc = new Document(jsonString)
doc.set(selfLink, createSelfLink(doc.getId))
doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision
doc
(doc, jsonString.length)
}

private def queryResultToWhiskJsonDoc(doc: Document): JsObject = {
Expand All @@ -490,10 +503,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
toWhiskJsonDoc(js, id, None)
}

private def getResultToWhiskJsonDoc(doc: Document): JsObject = {
private def getResultToWhiskJsonDoc(doc: Document): (JsObject, Int) = {
checkDoc(doc)
val js = doc.toJson.parseJson.asJsObject
toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
val jsString = doc.toJson
val js = jsString.parseJson.asJsObject
val whiskDoc = toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
(whiskDoc, jsString.length)
}

private def toDocInfo[T <: Resource](doc: T) = {
Expand Down Expand Up @@ -552,9 +567,23 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
else LogMarkerToken("cosmosdb", name, collName)(unit)
}

private def createDocSizeToken(): LogMarkerToken = {
val unit = MeasurementUnit.information.bytes
val name = "doc"
val tags = Map("collection" -> collName)
if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "size", tags = tags)(unit)
else LogMarkerToken("cosmosdb", name, collName)(unit)
}

private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true

private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue)

private def isNewDocument(doc: Document) = doc.getETag == null

private def extraLogs(r: ResourceResponse[_])(implicit tid: TransactionId): String = {
if (tid.meta.extraLogging) {
" " + r.getRequestDiagnosticsString
} else ""
}
}
Expand Up @@ -41,6 +41,8 @@ object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]()

RetryMetricsCollector.registerIfEnabled()

override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
Expand Down
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.cosmosdb

import akka.event.slf4j.SLF4JLogging
import ch.qos.logback.classic.LoggerContext
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase
import com.microsoft.azure.cosmosdb.rx.internal.ResourceThrottleRetryPolicy
import org.apache.openwhisk.common.{Counter => WhiskCounter}
import kamon.metric.{Counter, MeasurementUnit}
import org.apache.openwhisk.common.{LogMarkerToken, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.slf4j.LoggerFactory
import pureconfig._

import scala.util.Try

object CosmosDBAction extends Enumeration {
val Create, Query, Get, Others = Value
}

object RetryMetricsCollector extends AppenderBase[ILoggingEvent] with SLF4JLogging {
import CosmosDBAction._
private val tokens =
Map(Create -> Token(Create), Query -> Token(Query), Get -> Token(Get), Others -> Token(Others))

val retryCounter = new WhiskCounter
private[cosmosdb] def registerIfEnabled(): Unit = {
val enabled = loadConfigOrThrow[Boolean](s"${ConfigKeys.cosmosdb}.retry-stats-enabled")
if (enabled) {
log.info("Enabling retry metrics collector")
register()
}
}

/**
* CosmosDB uses below log message
* ```
* logger.warn(
* "Operation will be retried after {} milliseconds. Current attempt {}, Cumulative delay {}",
* retryDelay.toMillis(),
* this.currentAttemptCount,
* this.cumulativeRetryDelay,
* exception);
* ```
*
*/
override def append(e: ILoggingEvent): Unit = {
val msg = e.getMessage
val errorMsg = Option(e.getThrowableProxy).map(_.getMessage).getOrElse(msg)
for {
success <- isSuccessOrFailedRetry(msg)
token <- tokens.get(operationType(errorMsg))
} {
if (success) {
token.success.counter.increment()
//Element 1 has the count
val attemptCount = getRetryAttempt(e.getArgumentArray, 1)
token.success.histogram.record(attemptCount)

//Used mostly for test mode where tags may be disabled
//and test need to determine if count is increased
if (!TransactionId.metricsKamonTags) {
retryCounter.next()
}
} else {
token.failed.counter.increment()
}
}
}

def getCounter(opType: CosmosDBAction.Value, retryPassed: Boolean = true): Option[Counter] = {
tokens.get(opType).map(t => if (retryPassed) t.success else t.failed).map { _.counter }
}

private def getRetryAttempt(args: Array[AnyRef], index: Int) = {
val t = Try {
if (args != null & args.length > index) {
args(index) match {
case n: Number => n.intValue()
case _ => 0
}
} else 0
}
t.getOrElse(0)
}

private def register(): Unit = {
val logCtx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
val retryLogger = logCtx.getLogger(classOf[ResourceThrottleRetryPolicy].getName)
start()
retryLogger.addAppender(this)
}

private def isSuccessOrFailedRetry(msg: String) = {
if (msg.startsWith("Operation will be retried after")) Some(true)
else if (msg.startsWith("Operation will NOT be retried")) Some(false)
else None
}

private def operationType(errorMsg: String) = {
if (errorMsg.contains("OperationType: Query")) Query
else if (errorMsg.contains("OperationType: Create")) Create
else if (errorMsg.contains("OperationType: Get")) Get
else Others
}

private def createToken(opType: String, retryPassed: Boolean): LogMarkerToken = {
val action = if (retryPassed) "success" else "failed"
val tags = Map("type" -> opType)
if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "retry", action, tags = tags)(MeasurementUnit.none)
else LogMarkerToken("cosmosdb", "retry", action, Some(opType))(MeasurementUnit.none)
}

private case class Token(success: LogMarkerToken, failed: LogMarkerToken)

private object Token {
def apply(opType: CosmosDBAction.Value): Token =
new Token(createToken(opType.toString, retryPassed = true), createToken(opType.toString, retryPassed = false))
}
}

0 comments on commit 837bca3

Please sign in to comment.