From e0c7650fc72dd74a282cc3bd0e901b3ef5672fb7 Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Mon, 16 Nov 2020 14:39:56 -0500 Subject: [PATCH 1/5] support maxRetry --- .../google2/GoogleSubscriberInterpreter.scala | 39 ++++++++++++------- .../workbench/google2/GooglePubSubSpec.scala | 2 +- .../google2/mock/FakeGooglePublisher.scala | 3 +- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala index e8f64a213..68de0f825 100644 --- a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala +++ b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala @@ -21,13 +21,14 @@ import com.google.protobuf.Timestamp import scala.concurrent.duration.FiniteDuration -private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextShift, MessageType]( +private[google2] class GoogleSubscriberInterpreter[F[_]: Timer: ContextShift, MessageType]( subscriber: Subscriber, queue: fs2.concurrent.Queue[F, Event[MessageType]] -) extends GoogleSubscriber[F, MessageType] { +)(implicit F: Async[F]) + extends GoogleSubscriber[F, MessageType] { val messages: Stream[F, Event[MessageType]] = queue.dequeue - def start: F[Unit] = Async[F].async[Unit] { callback => + def start: F[Unit] = F.async[Unit] { callback => subscriber.addListener( new ApiService.Listener() { override def failed(from: ApiService.State, failure: Throwable): Unit = @@ -41,7 +42,7 @@ private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextSh } def stop: F[Unit] = - Async[F].async[Unit] { callback => + F.async[Unit] { callback => subscriber.addListener( new ApiService.Listener() { override def failed(from: ApiService.State, failure: Throwable): Unit = @@ -61,27 +62,34 @@ object GoogleSubscriberInterpreter { queue: fs2.concurrent.Queue[F, Event[MessageType]] ): GoogleSubscriberInterpreter[F, MessageType] = new GoogleSubscriberInterpreter[F, MessageType](subscriber, queue) - private[google2] def receiver[F[_]: Effect, MessageType: Decoder]( - queue: fs2.concurrent.Queue[F, Event[MessageType]] - )(implicit logger: StructuredLogger[F]): MessageReceiver = new MessageReceiver() { + private[google2] def receiver[F[_], MessageType: Decoder]( + queue: fs2.concurrent.Queue[F, Event[MessageType]], + maxRetries: MaxRetries + )(implicit logger: StructuredLogger[F], F: Effect[F]): MessageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { val parseEvent = for { - isJson <- Effect[F].fromEither(parse(message.getData.toStringUtf8)).attempt + isJson <- F.fromEither(parse(message.getData.toStringUtf8)).attempt msg <- isJson match { case Left(_) => - Effect[F].raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) + F.raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) case Right(json) => - Effect[F].fromEither(json.as[MessageType]) + F.fromEither(json.as[MessageType]) } traceId = Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)) } yield Event(msg, traceId, message.getPublishTime, consumer) + val deliveredTimes = Subscriber.getDeliveryAttempt(message) + val loggingCtx = Map( "traceId" -> Option(message.getAttributesMap.get("traceId")).getOrElse(""), - "publishTime" -> message.getPublishTime.toString + "publishTime" -> message.getPublishTime.toString, + "attempts" -> deliveredTimes.toString ) val result = for { + _ <- if (deliveredTimes > maxRetries.value) + logger.info(loggingCtx)(s"message reached maxRetry") >> F.delay(consumer.ack()) + else F.unit res <- parseEvent.attempt _ <- res match { case Right(event) => @@ -90,7 +98,7 @@ object GoogleSubscriberInterpreter { _ <- r match { case Left(e) => - logger.info(loggingCtx)(s"Subscriber fail to enqueue $message due to $e") >> Effect[F].delay( + logger.info(loggingCtx)(s"Subscriber fail to enqueue $message due to $e") >> F.delay( consumer.nack() ) //pubsub will resend the message up to ackDeadlineSeconds (this is configed during subscription creation) case Right(_) => @@ -99,7 +107,7 @@ object GoogleSubscriberInterpreter { } yield () case Left(e) => logger - .info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> Effect[F] + .info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> F .delay(consumer.ack()) } } yield () @@ -139,7 +147,7 @@ object GoogleSubscriberInterpreter { .setMaxOutstandingRequestBytes(config.maxOutstandingRequestBytes) .build ) - sub <- subscriberResource(queue, subscription, credential, flowControlSettings) + sub <- subscriberResource(queue, subscription, credential, subscriberConfig.maxRetries, flowControlSettings) } yield sub } @@ -169,12 +177,13 @@ object GoogleSubscriberInterpreter { queue: Queue[F, Event[MessageType]], subscription: ProjectSubscriptionName, credential: ServiceAccountCredentials, + maxRetries: MaxRetries, flowControlSettings: Option[FlowControlSettings] ): Resource[F, Subscriber] = { val subscriber = for { builder <- Sync[F].delay( Subscriber - .newBuilder(subscription, receiver(queue)) + .newBuilder(subscription, receiver(queue, maxRetries)) .setCredentialsProvider(FixedCredentialsProvider.create(credential)) ) builderWithFlowControlSetting <- flowControlSettings.traverse { fcs => diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala index 7311bd5cb..b4eabafb3 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala @@ -132,7 +132,7 @@ object GooglePubSubSpec { ) )(_ => /*IO(p.shutdown()) >>*/ IO.unit) //TODO: shutdown properly. Somehow this hangs the publisher unit test subscription = ProjectSubscriptionName.of(projectTopicName.getProject, projectTopicName.getTopic) - receiver = GoogleSubscriberInterpreter.receiver(queue) + receiver = GoogleSubscriberInterpreter.receiver(queue, MaxRetries(3)) sub <- Resource.liftF( IO( Subscriber diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala index bd820dfee..6793f796c 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala @@ -1,12 +1,11 @@ package org.broadinstitute.dsde.workbench.google2.mock -import cats.effect.{Concurrent, IO} +import cats.effect.IO import cats.mtl.Ask import com.google.pubsub.v1.PubsubMessage import fs2.Pipe import io.circe.Encoder import org.broadinstitute.dsde.workbench.google2.GooglePublisher -import org.broadinstitute.dsde.workbench.google2.JsonCodec._ import org.broadinstitute.dsde.workbench.model.TraceId class FakeGooglePublisher extends GooglePublisher[IO] { From 9ad1160b032f36c7ba8f18ce437fa0ca9cfcea89 Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Mon, 16 Nov 2020 14:47:45 -0500 Subject: [PATCH 2/5] travis-replace-me --- README.md | 2 +- google2/CHANGELOG.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e3c2ff20b..0082ce7ab 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Workbench utility libraries, built for Scala 2.12 and 2.13. You can find the ful Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC. -Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-426a0c2"` +Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-TRAVIS-REPLACE-ME"` To start the Google PubSub emulator for unit testing: diff --git a/google2/CHANGELOG.md b/google2/CHANGELOG.md index 6c94640ab..0d0ce1b42 100644 --- a/google2/CHANGELOG.md +++ b/google2/CHANGELOG.md @@ -10,8 +10,9 @@ Added: Changed: - Upgrade `cats-mtl` to `1.0.0` +- Fix a bug where `maxRetry` in SubscriberConfig not respected -SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-426a0c2"` +SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-TRAVIS-REPLACE-ME"` ## 0.14 Changed: From 93945210bbe664626d94c3febc9f77060d4e34cb Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Mon, 16 Nov 2020 17:25:52 -0500 Subject: [PATCH 3/5] tmp --- .../workbench/google2/GoogleSubscriberInterpreter.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala index 68de0f825..3f83f447e 100644 --- a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala +++ b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala @@ -78,18 +78,16 @@ object GoogleSubscriberInterpreter { traceId = Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)) } yield Event(msg, traceId, message.getPublishTime, consumer) - val deliveredTimes = Subscriber.getDeliveryAttempt(message) + // The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and zero otherwise + val deliveredTimes = Option(Subscriber.getDeliveryAttempt(message)).map(_.toInt).getOrElse(-1) val loggingCtx = Map( "traceId" -> Option(message.getAttributesMap.get("traceId")).getOrElse(""), "publishTime" -> message.getPublishTime.toString, "attempts" -> deliveredTimes.toString ) - + println(loggingCtx) val result = for { - _ <- if (deliveredTimes > maxRetries.value) - logger.info(loggingCtx)(s"message reached maxRetry") >> F.delay(consumer.ack()) - else F.unit res <- parseEvent.attempt _ <- res match { case Right(event) => From 54fbbd207ad3b1cac0b561d47f52577c091a77e2 Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Mon, 16 Nov 2020 10:30:56 -0500 Subject: [PATCH 4/5] attempt to support deadletter --- README.md | 2 +- google2/CHANGELOG.md | 10 ++- .../google2/GoogleSubscriberInterpreter.scala | 67 ++++++++++--------- .../google2/GooglePubSubMannualTest.scala | 2 +- .../workbench/google2/GooglePubSubSpec.scala | 2 +- project/Settings.scala | 2 +- 6 files changed, 49 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 0082ce7ab..b5131335a 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Workbench utility libraries, built for Scala 2.12 and 2.13. You can find the ful Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC. -Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-TRAVIS-REPLACE-ME"` +Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-TRAVIS-REPLACE-ME"` To start the Google PubSub emulator for unit testing: diff --git a/google2/CHANGELOG.md b/google2/CHANGELOG.md index 0d0ce1b42..19a9b1616 100644 --- a/google2/CHANGELOG.md +++ b/google2/CHANGELOG.md @@ -2,6 +2,13 @@ This file documents changes to the `workbench-google2` library, including notes on how to upgrade to new versions. +## 0.16 + +Changed: +- Add `subscriptionName: Option[ProjectSubscriptionName]` and `deadLetterPolicy: Option[SubscriberDeadLetterPolicy]` to `SubscriberConfig` + +SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-TRAVIS-REPLACE-ME"` + ## 0.15 Added: @@ -10,9 +17,8 @@ Added: Changed: - Upgrade `cats-mtl` to `1.0.0` -- Fix a bug where `maxRetry` in SubscriberConfig not respected -SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-TRAVIS-REPLACE-ME"` +SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-426a0c2"` ## 0.14 Changed: diff --git a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala index 3f83f447e..a1967f8c6 100644 --- a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala +++ b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala @@ -10,6 +10,7 @@ import com.google.api.gax.rpc.AlreadyExistsException import com.google.auth.oauth2.ServiceAccountCredentials import com.google.cloud.pubsub.v1._ import com.google.common.util.concurrent.MoreExecutors +import com.google.protobuf.Timestamp import com.google.pubsub.v1.{PubsubMessage, _} import fs2.Stream import fs2.concurrent.Queue @@ -17,7 +18,6 @@ import io.chrisdavenport.log4cats.{Logger, StructuredLogger} import io.circe.Decoder import io.circe.parser._ import org.broadinstitute.dsde.workbench.model.TraceId -import com.google.protobuf.Timestamp import scala.concurrent.duration.FiniteDuration @@ -63,8 +63,7 @@ object GoogleSubscriberInterpreter { ): GoogleSubscriberInterpreter[F, MessageType] = new GoogleSubscriberInterpreter[F, MessageType](subscriber, queue) private[google2] def receiver[F[_], MessageType: Decoder]( - queue: fs2.concurrent.Queue[F, Event[MessageType]], - maxRetries: MaxRetries + queue: fs2.concurrent.Queue[F, Event[MessageType]] )(implicit logger: StructuredLogger[F], F: Effect[F]): MessageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { val parseEvent = for { @@ -81,12 +80,6 @@ object GoogleSubscriberInterpreter { // The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and zero otherwise val deliveredTimes = Option(Subscriber.getDeliveryAttempt(message)).map(_.toInt).getOrElse(-1) - val loggingCtx = Map( - "traceId" -> Option(message.getAttributesMap.get("traceId")).getOrElse(""), - "publishTime" -> message.getPublishTime.toString, - "attempts" -> deliveredTimes.toString - ) - println(loggingCtx) val result = for { res <- parseEvent.attempt _ <- res match { @@ -96,11 +89,11 @@ object GoogleSubscriberInterpreter { _ <- r match { case Left(e) => - logger.info(loggingCtx)(s"Subscriber fail to enqueue $message due to $e") >> F.delay( + logger.info(s"Subscriber fail to enqueue $message due to $e") >> F.delay( consumer.nack() ) //pubsub will resend the message up to ackDeadlineSeconds (this is configed during subscription creation) case Right(_) => - logger.info(loggingCtx)(s"Subscriber Successfully received $message.") + logger.info(s"Subscriber Successfully received $message.") } } yield () case Left(e) => @@ -131,8 +124,9 @@ object GoogleSubscriberInterpreter { subscriberConfig: SubscriberConfig, queue: fs2.concurrent.Queue[F, Event[MessageType]] ): Resource[F, Subscriber] = { - val subscription = + val subscription = subscriberConfig.subscriptionName.getOrElse( ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) + ) for { credential <- credentialResource(subscriberConfig.pathToCredentialJson) @@ -145,7 +139,7 @@ object GoogleSubscriberInterpreter { .setMaxOutstandingRequestBytes(config.maxOutstandingRequestBytes) .build ) - sub <- subscriberResource(queue, subscription, credential, subscriberConfig.maxRetries, flowControlSettings) + sub <- subscriberResource(queue, subscription, credential, flowControlSettings) } yield sub } @@ -153,8 +147,9 @@ object GoogleSubscriberInterpreter { subscriberConfig: SubscriberConfig, queue: fs2.concurrent.Queue[F, Event[String]] ): Resource[F, Subscriber] = { - val subscription = + val subscription = subscriberConfig.subscriptionName.getOrElse( ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) + ) for { credential <- credentialResource(subscriberConfig.pathToCredentialJson) @@ -175,13 +170,12 @@ object GoogleSubscriberInterpreter { queue: Queue[F, Event[MessageType]], subscription: ProjectSubscriptionName, credential: ServiceAccountCredentials, - maxRetries: MaxRetries, flowControlSettings: Option[FlowControlSettings] ): Resource[F, Subscriber] = { val subscriber = for { builder <- Sync[F].delay( Subscriber - .newBuilder(subscription, receiver(queue, maxRetries)) + .newBuilder(subscription, receiver(queue)) .setCredentialsProvider(FixedCredentialsProvider.create(credential)) ) builderWithFlowControlSetting <- flowControlSettings.traverse { fcs => @@ -213,25 +207,33 @@ object GoogleSubscriberInterpreter { } private def createSubscription[F[_]: Effect: Logger]( - subsriberConfig: SubscriberConfig, + subscriberConfig: SubscriberConfig, subscription: ProjectSubscriptionName, subscriptionAdminClient: SubscriptionAdminClient ): Resource[F, Unit] = { val sub = Subscription .newBuilder() .setName(subscription.toString) - .setTopic(subsriberConfig.topicName.toString) + .setTopic(subscriberConfig.topicName.toString) .setPushConfig(PushConfig.getDefaultInstance) - .setAckDeadlineSeconds(subsriberConfig.ackDeadLine.toSeconds.toInt) -// Comment this out since this causes this error: INVALID_ARGUMENT: Invalid resource name given (name=). Refer to https://cloud.google.com/pubsub/docs/admin#resource_names for more information -// .setDeadLetterPolicy( -// DeadLetterPolicy.newBuilder().setMaxDeliveryAttempts(subsriberConfig.maxRetries.value).build() -// ) - .build() + .setAckDeadlineSeconds(subscriberConfig.ackDeadLine.toSeconds.toInt) + + val subWithDeadLetterPolicy = subscriberConfig.deadLetterPolicy.fold(sub.build()) { deadLetterPolicy => + sub + .setDeadLetterPolicy( + DeadLetterPolicy + .newBuilder() + .setDeadLetterTopic(deadLetterPolicy.topicName.toString) + .setMaxDeliveryAttempts(deadLetterPolicy.maxRetries.value) + .build() + ) + .build() + } + Resource.liftF( Async[F] .delay( - subscriptionAdminClient.createSubscription(sub) + subscriptionAdminClient.createSubscription(subWithDeadLetterPolicy) ) .void .recover { @@ -254,10 +256,15 @@ object GoogleSubscriberInterpreter { } final case class FlowControlSettingsConfig(maxOutstandingElementCount: Long, maxOutstandingRequestBytes: Long) -final case class SubscriberConfig(pathToCredentialJson: String, - topicName: TopicName, - ackDeadLine: FiniteDuration, - maxRetries: MaxRetries, - flowControlSettingsConfig: Option[FlowControlSettingsConfig]) +final case class SubscriberConfig( + pathToCredentialJson: String, + topicName: TopicName, + subscriptionName: Option[ProjectSubscriptionName], //it'll have the same name as topic if this is None + ackDeadLine: FiniteDuration, + deadLetterPolicy: Option[SubscriberDeadLetterPolicy], + flowControlSettingsConfig: Option[FlowControlSettingsConfig] +) final case class MaxRetries(value: Int) extends AnyVal +final case class SubscriberDeadLetterPolicy(topicName: TopicName, maxRetries: MaxRetries) + final case class Event[A](msg: A, traceId: Option[TraceId] = None, publishedTime: Timestamp, consumer: AckReplyConsumer) diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala index 23e831dd4..278bff0f9 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala @@ -52,7 +52,7 @@ object GooglePubSubMannualTest { * You can now publish messages in console and watch messages being printed out */ def subscriber() = { - val config = SubscriberConfig(path, projectTopicName, 1 minute, MaxRetries(10), None) + val config = SubscriberConfig(path, projectTopicName, None, 1 minute, None, None) for { queue <- InspectableQueue.bounded[IO, Event[Messagee]](100) sub = GoogleSubscriber.resource[IO, Messagee](config, queue) diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala index b4eabafb3..7311bd5cb 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubSpec.scala @@ -132,7 +132,7 @@ object GooglePubSubSpec { ) )(_ => /*IO(p.shutdown()) >>*/ IO.unit) //TODO: shutdown properly. Somehow this hangs the publisher unit test subscription = ProjectSubscriptionName.of(projectTopicName.getProject, projectTopicName.getTopic) - receiver = GoogleSubscriberInterpreter.receiver(queue, MaxRetries(3)) + receiver = GoogleSubscriberInterpreter.receiver(queue) sub <- Resource.liftF( IO( Subscriber diff --git a/project/Settings.scala b/project/Settings.scala index db9f59d81..47c59b4de 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -157,7 +157,7 @@ object Settings { val google2Settings = cross212and213 ++ commonSettings ++ List( name := "workbench-google2", libraryDependencies ++= google2Dependencies, - version := createVersion("0.15") + version := createVersion("0.16") ) ++ publishSettings val openTelemetrySettings = cross212and213 ++ commonSettings ++ List( From d9f3bcb4f9a3281013a23c7dbefd3ab65b573cd9 Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Mon, 16 Nov 2020 18:11:33 -0500 Subject: [PATCH 5/5] fix comment --- .../dsde/workbench/google2/GoogleSubscriberInterpreter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala index a1967f8c6..80780606c 100644 --- a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala +++ b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala @@ -77,7 +77,7 @@ object GoogleSubscriberInterpreter { traceId = Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)) } yield Event(msg, traceId, message.getPublishTime, consumer) - // The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and zero otherwise + // The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and null otherwise val deliveredTimes = Option(Subscriber.getDeliveryAttempt(message)).map(_.toInt).getOrElse(-1) val result = for {