Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker lambda metrics #734

Merged
merged 15 commits into from
Aug 31, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import db._
import doobie.util.transactor.Transactor
import fs2.{Pipe, Stream}
import org.slf4j.{Logger, LoggerFactory}
import org.threeten.bp.ZoneOffset

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.jdk.CollectionConverters._
import java.time.{Duration, Instant}
import java.time.{Duration, Instant, LocalDateTime, ZoneId}

sealed trait WorkerSqs
object WorkerSqs {
Expand Down Expand Up @@ -96,15 +97,12 @@ trait HarvesterRequestHandler extends Logging {
}

def processNotification(event: SQSEvent, tokenService: TokenService[IO]) = {
val start = Instant.now
val records = event.getRecords.asScala.toList.map(r => r.getBody).map(NotificationParser.parseShardNotificationEvent)
records.foreach(record =>
logger.info(Map(
"notificationId" -> record.notification.id,
"harvester.notificationProcessingStartTime.millis" -> start.toEpochMilli,
"harvester.notificationProcessingStartTime.string" -> start.toString,
val records = event.getRecords.asScala.toList.map(r => (NotificationParser.parseShardNotificationEvent(r.getBody), r.getAttributes))
records.foreach {
case (body, _) => logger.info(Map(
"notificationId" -> body.notification.id,
), "Parsed notification event")
)
}
val shardNotificationStream: Stream[IO, ShardedNotification] = Stream.emits(event.getRecords.asScala)
.map(r => r.getBody)
.map(NotificationParser.parseShardNotificationEvent)
Expand All @@ -115,25 +113,47 @@ trait HarvesterRequestHandler extends Logging {
.unsafeRunSync()
}catch {
case e: Throwable => {
records.foreach(record =>
logger.error(Map(
"notificationId" -> record.notification.id,
"notificationType" -> record.notification.`type`.toString,
), s"Error occurred: ${e.getMessage}", e)
)
records.foreach {
case (body, _) =>
logger.error(Map(
"notificationId" -> body.notification.id,
"notificationType" -> body.notification.`type`.toString,
), s"Error occurred: ${e.getMessage}", e)
}
throw e
}
}finally {
val end = Instant.now
records.foreach(record =>
logger.info(Map(
"notificationId" -> record.notification.id,
"notificationType" -> record.notification.`type`.toString,
"harvester.notificationProcessingTime" -> Duration.between(start, end).toMillis,
"harvester.notificationProcessingEndTime.millis" -> end.toEpochMilli,
"harvester.notificationProcessingEndTime.string" -> end.toString,
), "Finished processing notification event")
)
records.foreach {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment the harvester batchSize is 1. I think if we increased this then logging in the finally block won't provide us with accurate metrics. Not something that I think I need to address now, just a consideration for the future

case (body, attributes) => {
val end = Instant.now
val sentTime = Instant.ofEpochMilli(attributes.getOrDefault("SentTimestamp", "0").toLong)

logger.info(Map(
"_aws" -> Map(
"Timestamp" -> end.toEpochMilli,
"CloudWatchMetrics" -> List(Map(
"Namespace" -> s"Notifications/${env.stage}/harvester",
"Dimensions" -> List(List("type")),
"Metrics" -> List(Map(
"Name" -> "harvester.notificationProcessingTime",
"Unit" -> "Milliseconds"
))
))
),
"harvester.notificationProcessingTime" -> Duration.between(sentTime, end).toMillis,
"harvester.notificationProcessingEndTime.millis" -> end.toEpochMilli,
"harvester.notificationProcessingStartTime.millis" -> sentTime.toEpochMilli,
"notificationId" -> body.notification.id,
"notificationType" -> body.notification.`type`.toString,
"type" -> {
body.notification.`type` match {
case _root_.models.NotificationType.BreakingNews => "breakingNews"
case _ => "other"
}
}
), "Finished processing notification event")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.gu.notifications.worker

import java.util.UUID
import _root_.models.ShardRange
import _root_.models.{Notification, NotificationType, ShardRange}
import cats.effect.{ContextShift, IO, Timer}
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.gu.notifications.worker.cleaning.CleaningClient
import com.gu.notifications.worker.delivery.DeliveryException.InvalidToken
import com.gu.notifications.worker.delivery._
import com.gu.notifications.worker.models.SendingResults
import models.SendingResults
import com.gu.notifications.worker.tokens.{ChunkedTokens, IndividualNotification}
import com.gu.notifications.worker.utils.{Cloudwatch, Logging, NotificationParser, Reporting}
import fs2.{Pipe, Stream}
Expand All @@ -34,16 +34,33 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging {
implicit val timer: Timer[IO] = IO.timer(ec)
implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

def reportSuccesses[C <: DeliveryClient](notificationId: UUID, range: ShardRange, start: Instant): Pipe[IO, Either[DeliveryException, DeliverySuccess], Unit] = { input =>
val notificationLog = s"(notification: ${notificationId} ${range})"
def reportSuccesses[C <: DeliveryClient](notification: Notification, range: ShardRange, sentTime: Long): Pipe[IO, Either[DeliveryException, DeliverySuccess], Unit] = { input =>
val notificationLog = s"(notification: ${notification.id} $range)"
val start = Instant.ofEpochMilli(sentTime)
val end = Instant.now
val logFields = Map(
"notificationId" -> notificationId,
"_aws" -> Map(
"Timestamp" -> end.toEpochMilli,
"CloudWatchMetrics" -> List(Map(
"Namespace" -> s"Notifications/${env.stage}/workers",
"Dimensions" -> List(List("platform", "type")),
"Metrics" -> List(Map(
"Name" -> "worker.notificationProcessingTime",
"Unit" -> "Milliseconds"
))
))
),
"notificationId" -> notification.id,
"platform" -> Configuration.platform.map(_.toString).getOrElse("unknown"),
"type" -> {
notification.`type` match {
case NotificationType.BreakingNews => "breakingNews"
case _ => "other"
}
},
"worker.notificationProcessingTime" -> Duration.between(start, end).toMillis,
"worker.notificationProcessingStartTime.millis" -> start.toEpochMilli,
"worker.notificationProcessingStartTime.string" -> start.toString,
"worker.notificationProcessingStartTime.millis" -> sentTime,
"worker.notificationProcessingEndTime.millis" -> end.toEpochMilli,
"worker.notificationProcessingEndTime.string" -> end.toString,
)
input.fold(SendingResults.empty) { case (acc, resp) => SendingResults.aggregate(acc, resp) }
.evalTap(logInfoWithFields(logFields, prefix = s"Results $notificationLog: "))
Expand Down Expand Up @@ -72,22 +89,21 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging {
.evalTap(Reporting.log(s"Sending failure: "))
} yield resp

def deliverChunkedTokens(chunkedTokenStream: Stream[IO, ChunkedTokens], start: Instant): Stream[IO, Unit] = {
def deliverChunkedTokens(chunkedTokenStream: Stream[IO, (ChunkedTokens, Long)]): Stream[IO, Unit] = {
for {
chunkedTokens <- chunkedTokenStream
(chunkedTokens, sentTime) <- chunkedTokenStream
individualNotifications = Stream.emits(chunkedTokens.toNotificationToSends).covary[IO]
resp <- deliverIndividualNotificationStream(individualNotifications)
.broadcastTo(reportSuccesses(chunkedTokens.notification.id, chunkedTokens.range, start), cleanupFailures, trackProgress(chunkedTokens.notification.id))
.broadcastTo(reportSuccesses(chunkedTokens.notification, chunkedTokens.range, sentTime), cleanupFailures, trackProgress(chunkedTokens.notification.id))
} yield resp
}

def handleChunkTokens(event: SQSEvent, context: Context): Unit = {
val start = Instant.now
val chunkedTokenStream: Stream[IO, ChunkedTokens] = Stream.emits(event.getRecords.asScala)
.map(r => r.getBody)
.map(NotificationParser.parseChunkedTokenEvent)
val chunkedTokenStream: Stream[IO, (ChunkedTokens, Long)] = Stream.emits(event.getRecords.asScala)
.map(r => (r.getBody, r.getAttributes.getOrDefault("SentTimestamp", "0").toLong))
.map { case (body, sentTimestamp) => (NotificationParser.parseChunkedTokenEvent(body), sentTimestamp) }

deliverChunkedTokens(chunkedTokenStream, start)
deliverChunkedTokens(chunkedTokenStream)
.compile
.drain
.unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class HarvesterRequestHandlerSpec extends Specification with Matchers {
val event = new SQSEvent()
val sqsMessage = new SQSMessage()
sqsMessage.setBody(Json.stringify(Json.toJson(shardedNotification)))
sqsMessage.setAttributes((Map("SentTimestamp" -> "10").asJava))
event.setRecords(List(sqsMessage).asJava)
event
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class SenderRequestHandlerSpec extends Specification with Matchers {
val event = new SQSEvent()
val sqsMessage = new SQSMessage()
sqsMessage.setBody(Json.stringify(Json.toJson(chunkedTokens)))
sqsMessage.setAttributes((Map("SentTimestamp" -> "10").asJava))
event.setRecords(List(sqsMessage).asJava)
event
}
Expand Down