diff --git a/README.md b/README.md index e3c2ff20b..b5131335a 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Workbench utility libraries, built for Scala 2.12 and 2.13. You can find the ful Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC. -Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.15-426a0c2"` +Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-TRAVIS-REPLACE-ME"` To start the Google PubSub emulator for unit testing: diff --git a/google2/CHANGELOG.md b/google2/CHANGELOG.md index 6c94640ab..19a9b1616 100644 --- a/google2/CHANGELOG.md +++ b/google2/CHANGELOG.md @@ -2,6 +2,13 @@ This file documents changes to the `workbench-google2` library, including notes on how to upgrade to new versions. +## 0.16 + +Changed: +- Add `subscriptionName: Option[ProjectSubscriptionName]` and `deadLetterPolicy: Option[SubscriberDeadLetterPolicy]` to `SubscriberConfig` + +SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-TRAVIS-REPLACE-ME"` + ## 0.15 Added: diff --git a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala index e8f64a213..80780606c 100644 --- a/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala +++ b/google2/src/main/scala/org/broadinstitute/dsde/workbench/google2/GoogleSubscriberInterpreter.scala @@ -10,6 +10,7 @@ import com.google.api.gax.rpc.AlreadyExistsException import com.google.auth.oauth2.ServiceAccountCredentials import com.google.cloud.pubsub.v1._ import com.google.common.util.concurrent.MoreExecutors +import com.google.protobuf.Timestamp import com.google.pubsub.v1.{PubsubMessage, _} import fs2.Stream import fs2.concurrent.Queue @@ -17,17 +18,17 @@ import io.chrisdavenport.log4cats.{Logger, StructuredLogger} import io.circe.Decoder import io.circe.parser._ import org.broadinstitute.dsde.workbench.model.TraceId -import com.google.protobuf.Timestamp import scala.concurrent.duration.FiniteDuration -private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextShift, MessageType]( +private[google2] class GoogleSubscriberInterpreter[F[_]: Timer: ContextShift, MessageType]( subscriber: Subscriber, queue: fs2.concurrent.Queue[F, Event[MessageType]] -) extends GoogleSubscriber[F, MessageType] { +)(implicit F: Async[F]) + extends GoogleSubscriber[F, MessageType] { val messages: Stream[F, Event[MessageType]] = queue.dequeue - def start: F[Unit] = Async[F].async[Unit] { callback => + def start: F[Unit] = F.async[Unit] { callback => subscriber.addListener( new ApiService.Listener() { override def failed(from: ApiService.State, failure: Throwable): Unit = @@ -41,7 +42,7 @@ private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextSh } def stop: F[Unit] = - Async[F].async[Unit] { callback => + F.async[Unit] { callback => subscriber.addListener( new ApiService.Listener() { override def failed(from: ApiService.State, failure: Throwable): Unit = @@ -61,25 +62,23 @@ object GoogleSubscriberInterpreter { queue: fs2.concurrent.Queue[F, Event[MessageType]] ): GoogleSubscriberInterpreter[F, MessageType] = new GoogleSubscriberInterpreter[F, MessageType](subscriber, queue) - private[google2] def receiver[F[_]: Effect, MessageType: Decoder]( + private[google2] def receiver[F[_], MessageType: Decoder]( queue: fs2.concurrent.Queue[F, Event[MessageType]] - )(implicit logger: StructuredLogger[F]): MessageReceiver = new MessageReceiver() { + )(implicit logger: StructuredLogger[F], F: Effect[F]): MessageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { val parseEvent = for { - isJson <- Effect[F].fromEither(parse(message.getData.toStringUtf8)).attempt + isJson <- F.fromEither(parse(message.getData.toStringUtf8)).attempt msg <- isJson match { case Left(_) => - Effect[F].raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) + F.raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) case Right(json) => - Effect[F].fromEither(json.as[MessageType]) + F.fromEither(json.as[MessageType]) } traceId = Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)) } yield Event(msg, traceId, message.getPublishTime, consumer) - val loggingCtx = Map( - "traceId" -> Option(message.getAttributesMap.get("traceId")).getOrElse(""), - "publishTime" -> message.getPublishTime.toString - ) + // The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and null otherwise + val deliveredTimes = Option(Subscriber.getDeliveryAttempt(message)).map(_.toInt).getOrElse(-1) val result = for { res <- parseEvent.attempt @@ -90,16 +89,16 @@ object GoogleSubscriberInterpreter { _ <- r match { case Left(e) => - logger.info(loggingCtx)(s"Subscriber fail to enqueue $message due to $e") >> Effect[F].delay( + logger.info(s"Subscriber fail to enqueue $message due to $e") >> F.delay( consumer.nack() ) //pubsub will resend the message up to ackDeadlineSeconds (this is configed during subscription creation) case Right(_) => - logger.info(loggingCtx)(s"Subscriber Successfully received $message.") + logger.info(s"Subscriber Successfully received $message.") } } yield () case Left(e) => logger - .info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> Effect[F] + .info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> F .delay(consumer.ack()) } } yield () @@ -125,8 +124,9 @@ object GoogleSubscriberInterpreter { subscriberConfig: SubscriberConfig, queue: fs2.concurrent.Queue[F, Event[MessageType]] ): Resource[F, Subscriber] = { - val subscription = + val subscription = subscriberConfig.subscriptionName.getOrElse( ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) + ) for { credential <- credentialResource(subscriberConfig.pathToCredentialJson) @@ -147,8 +147,9 @@ object GoogleSubscriberInterpreter { subscriberConfig: SubscriberConfig, queue: fs2.concurrent.Queue[F, Event[String]] ): Resource[F, Subscriber] = { - val subscription = + val subscription = subscriberConfig.subscriptionName.getOrElse( ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) + ) for { credential <- credentialResource(subscriberConfig.pathToCredentialJson) @@ -206,25 +207,33 @@ object GoogleSubscriberInterpreter { } private def createSubscription[F[_]: Effect: Logger]( - subsriberConfig: SubscriberConfig, + subscriberConfig: SubscriberConfig, subscription: ProjectSubscriptionName, subscriptionAdminClient: SubscriptionAdminClient ): Resource[F, Unit] = { val sub = Subscription .newBuilder() .setName(subscription.toString) - .setTopic(subsriberConfig.topicName.toString) + .setTopic(subscriberConfig.topicName.toString) .setPushConfig(PushConfig.getDefaultInstance) - .setAckDeadlineSeconds(subsriberConfig.ackDeadLine.toSeconds.toInt) -// Comment this out since this causes this error: INVALID_ARGUMENT: Invalid resource name given (name=). Refer to https://cloud.google.com/pubsub/docs/admin#resource_names for more information -// .setDeadLetterPolicy( -// DeadLetterPolicy.newBuilder().setMaxDeliveryAttempts(subsriberConfig.maxRetries.value).build() -// ) - .build() + .setAckDeadlineSeconds(subscriberConfig.ackDeadLine.toSeconds.toInt) + + val subWithDeadLetterPolicy = subscriberConfig.deadLetterPolicy.fold(sub.build()) { deadLetterPolicy => + sub + .setDeadLetterPolicy( + DeadLetterPolicy + .newBuilder() + .setDeadLetterTopic(deadLetterPolicy.topicName.toString) + .setMaxDeliveryAttempts(deadLetterPolicy.maxRetries.value) + .build() + ) + .build() + } + Resource.liftF( Async[F] .delay( - subscriptionAdminClient.createSubscription(sub) + subscriptionAdminClient.createSubscription(subWithDeadLetterPolicy) ) .void .recover { @@ -247,10 +256,15 @@ object GoogleSubscriberInterpreter { } final case class FlowControlSettingsConfig(maxOutstandingElementCount: Long, maxOutstandingRequestBytes: Long) -final case class SubscriberConfig(pathToCredentialJson: String, - topicName: TopicName, - ackDeadLine: FiniteDuration, - maxRetries: MaxRetries, - flowControlSettingsConfig: Option[FlowControlSettingsConfig]) +final case class SubscriberConfig( + pathToCredentialJson: String, + topicName: TopicName, + subscriptionName: Option[ProjectSubscriptionName], //it'll have the same name as topic if this is None + ackDeadLine: FiniteDuration, + deadLetterPolicy: Option[SubscriberDeadLetterPolicy], + flowControlSettingsConfig: Option[FlowControlSettingsConfig] +) final case class MaxRetries(value: Int) extends AnyVal +final case class SubscriberDeadLetterPolicy(topicName: TopicName, maxRetries: MaxRetries) + final case class Event[A](msg: A, traceId: Option[TraceId] = None, publishedTime: Timestamp, consumer: AckReplyConsumer) diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala index 23e831dd4..278bff0f9 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/GooglePubSubMannualTest.scala @@ -52,7 +52,7 @@ object GooglePubSubMannualTest { * You can now publish messages in console and watch messages being printed out */ def subscriber() = { - val config = SubscriberConfig(path, projectTopicName, 1 minute, MaxRetries(10), None) + val config = SubscriberConfig(path, projectTopicName, None, 1 minute, None, None) for { queue <- InspectableQueue.bounded[IO, Event[Messagee]](100) sub = GoogleSubscriber.resource[IO, Messagee](config, queue) diff --git a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala index bd820dfee..6793f796c 100644 --- a/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala +++ b/google2/src/test/scala/org/broadinstitute/dsde/workbench/google2/mock/FakeGooglePublisher.scala @@ -1,12 +1,11 @@ package org.broadinstitute.dsde.workbench.google2.mock -import cats.effect.{Concurrent, IO} +import cats.effect.IO import cats.mtl.Ask import com.google.pubsub.v1.PubsubMessage import fs2.Pipe import io.circe.Encoder import org.broadinstitute.dsde.workbench.google2.GooglePublisher -import org.broadinstitute.dsde.workbench.google2.JsonCodec._ import org.broadinstitute.dsde.workbench.model.TraceId class FakeGooglePublisher extends GooglePublisher[IO] { diff --git a/project/Settings.scala b/project/Settings.scala index db9f59d81..47c59b4de 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -157,7 +157,7 @@ object Settings { val google2Settings = cross212and213 ++ commonSettings ++ List( name := "workbench-google2", libraryDependencies ++= google2Dependencies, - version := createVersion("0.15") + version := createVersion("0.16") ) ++ publishSettings val openTelemetrySettings = cross212and213 ++ commonSettings ++ List(