-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IA-2355] support maxRetry #372
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,24 +10,25 @@ import com.google.api.gax.rpc.AlreadyExistsException | |
import com.google.auth.oauth2.ServiceAccountCredentials | ||
import com.google.cloud.pubsub.v1._ | ||
import com.google.common.util.concurrent.MoreExecutors | ||
import com.google.protobuf.Timestamp | ||
import com.google.pubsub.v1.{PubsubMessage, _} | ||
import fs2.Stream | ||
import fs2.concurrent.Queue | ||
import io.chrisdavenport.log4cats.{Logger, StructuredLogger} | ||
import io.circe.Decoder | ||
import io.circe.parser._ | ||
import org.broadinstitute.dsde.workbench.model.TraceId | ||
import com.google.protobuf.Timestamp | ||
|
||
import scala.concurrent.duration.FiniteDuration | ||
|
||
private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextShift, MessageType]( | ||
private[google2] class GoogleSubscriberInterpreter[F[_]: Timer: ContextShift, MessageType]( | ||
subscriber: Subscriber, | ||
queue: fs2.concurrent.Queue[F, Event[MessageType]] | ||
) extends GoogleSubscriber[F, MessageType] { | ||
)(implicit F: Async[F]) | ||
extends GoogleSubscriber[F, MessageType] { | ||
val messages: Stream[F, Event[MessageType]] = queue.dequeue | ||
|
||
def start: F[Unit] = Async[F].async[Unit] { callback => | ||
def start: F[Unit] = F.async[Unit] { callback => | ||
subscriber.addListener( | ||
new ApiService.Listener() { | ||
override def failed(from: ApiService.State, failure: Throwable): Unit = | ||
|
@@ -41,7 +42,7 @@ private[google2] class GoogleSubscriberInterpreter[F[_]: Async: Timer: ContextSh | |
} | ||
|
||
def stop: F[Unit] = | ||
Async[F].async[Unit] { callback => | ||
F.async[Unit] { callback => | ||
subscriber.addListener( | ||
new ApiService.Listener() { | ||
override def failed(from: ApiService.State, failure: Throwable): Unit = | ||
|
@@ -61,25 +62,23 @@ object GoogleSubscriberInterpreter { | |
queue: fs2.concurrent.Queue[F, Event[MessageType]] | ||
): GoogleSubscriberInterpreter[F, MessageType] = new GoogleSubscriberInterpreter[F, MessageType](subscriber, queue) | ||
|
||
private[google2] def receiver[F[_]: Effect, MessageType: Decoder]( | ||
private[google2] def receiver[F[_], MessageType: Decoder]( | ||
queue: fs2.concurrent.Queue[F, Event[MessageType]] | ||
)(implicit logger: StructuredLogger[F]): MessageReceiver = new MessageReceiver() { | ||
)(implicit logger: StructuredLogger[F], F: Effect[F]): MessageReceiver = new MessageReceiver() { | ||
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { | ||
val parseEvent = for { | ||
isJson <- Effect[F].fromEither(parse(message.getData.toStringUtf8)).attempt | ||
isJson <- F.fromEither(parse(message.getData.toStringUtf8)).attempt | ||
msg <- isJson match { | ||
case Left(_) => | ||
Effect[F].raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) | ||
F.raiseError[MessageType](new Exception(s"${message.getData.toStringUtf8} is not a valid Json")) | ||
case Right(json) => | ||
Effect[F].fromEither(json.as[MessageType]) | ||
F.fromEither(json.as[MessageType]) | ||
} | ||
traceId = Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)) | ||
} yield Event(msg, traceId, message.getPublishTime, consumer) | ||
|
||
val loggingCtx = Map( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this context not useful to log, or is it logged in another way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when we log the |
||
"traceId" -> Option(message.getAttributesMap.get("traceId")).getOrElse(""), | ||
"publishTime" -> message.getPublishTime.toString | ||
) | ||
// The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy is set on the subscription, and null otherwise | ||
val deliveredTimes = Option(Subscriber.getDeliveryAttempt(message)).map(_.toInt).getOrElse(-1) | ||
|
||
val result = for { | ||
res <- parseEvent.attempt | ||
|
@@ -90,16 +89,16 @@ object GoogleSubscriberInterpreter { | |
|
||
_ <- r match { | ||
case Left(e) => | ||
logger.info(loggingCtx)(s"Subscriber fail to enqueue $message due to $e") >> Effect[F].delay( | ||
logger.info(s"Subscriber fail to enqueue $message due to $e") >> F.delay( | ||
consumer.nack() | ||
) //pubsub will resend the message up to ackDeadlineSeconds (this is configed during subscription creation) | ||
case Right(_) => | ||
logger.info(loggingCtx)(s"Subscriber Successfully received $message.") | ||
logger.info(s"Subscriber Successfully received $message.") | ||
} | ||
} yield () | ||
case Left(e) => | ||
logger | ||
.info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> Effect[F] | ||
.info(s"Subscriber fail to decode message ${message} due to ${e}. Going to ack the message") >> F | ||
.delay(consumer.ack()) | ||
} | ||
} yield () | ||
|
@@ -125,8 +124,9 @@ object GoogleSubscriberInterpreter { | |
subscriberConfig: SubscriberConfig, | ||
queue: fs2.concurrent.Queue[F, Event[MessageType]] | ||
): Resource[F, Subscriber] = { | ||
val subscription = | ||
val subscription = subscriberConfig.subscriptionName.getOrElse( | ||
ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) | ||
) | ||
|
||
for { | ||
credential <- credentialResource(subscriberConfig.pathToCredentialJson) | ||
|
@@ -147,8 +147,9 @@ object GoogleSubscriberInterpreter { | |
subscriberConfig: SubscriberConfig, | ||
queue: fs2.concurrent.Queue[F, Event[String]] | ||
): Resource[F, Subscriber] = { | ||
val subscription = | ||
val subscription = subscriberConfig.subscriptionName.getOrElse( | ||
ProjectSubscriptionName.of(subscriberConfig.topicName.getProject, subscriberConfig.topicName.getTopic) | ||
) | ||
|
||
for { | ||
credential <- credentialResource(subscriberConfig.pathToCredentialJson) | ||
|
@@ -206,25 +207,33 @@ object GoogleSubscriberInterpreter { | |
} | ||
|
||
private def createSubscription[F[_]: Effect: Logger]( | ||
subsriberConfig: SubscriberConfig, | ||
subscriberConfig: SubscriberConfig, | ||
subscription: ProjectSubscriptionName, | ||
subscriptionAdminClient: SubscriptionAdminClient | ||
): Resource[F, Unit] = { | ||
val sub = Subscription | ||
.newBuilder() | ||
.setName(subscription.toString) | ||
.setTopic(subsriberConfig.topicName.toString) | ||
.setTopic(subscriberConfig.topicName.toString) | ||
.setPushConfig(PushConfig.getDefaultInstance) | ||
.setAckDeadlineSeconds(subsriberConfig.ackDeadLine.toSeconds.toInt) | ||
// Comment this out since this causes this error: INVALID_ARGUMENT: Invalid resource name given (name=). Refer to https://cloud.google.com/pubsub/docs/admin#resource_names for more information | ||
// .setDeadLetterPolicy( | ||
// DeadLetterPolicy.newBuilder().setMaxDeliveryAttempts(subsriberConfig.maxRetries.value).build() | ||
// ) | ||
.build() | ||
.setAckDeadlineSeconds(subscriberConfig.ackDeadLine.toSeconds.toInt) | ||
|
||
val subWithDeadLetterPolicy = subscriberConfig.deadLetterPolicy.fold(sub.build()) { deadLetterPolicy => | ||
sub | ||
.setDeadLetterPolicy( | ||
DeadLetterPolicy | ||
.newBuilder() | ||
.setDeadLetterTopic(deadLetterPolicy.topicName.toString) | ||
.setMaxDeliveryAttempts(deadLetterPolicy.maxRetries.value) | ||
.build() | ||
) | ||
.build() | ||
} | ||
|
||
Resource.liftF( | ||
Async[F] | ||
.delay( | ||
subscriptionAdminClient.createSubscription(sub) | ||
subscriptionAdminClient.createSubscription(subWithDeadLetterPolicy) | ||
) | ||
.void | ||
.recover { | ||
|
@@ -247,10 +256,15 @@ object GoogleSubscriberInterpreter { | |
} | ||
|
||
final case class FlowControlSettingsConfig(maxOutstandingElementCount: Long, maxOutstandingRequestBytes: Long) | ||
final case class SubscriberConfig(pathToCredentialJson: String, | ||
topicName: TopicName, | ||
ackDeadLine: FiniteDuration, | ||
maxRetries: MaxRetries, | ||
flowControlSettingsConfig: Option[FlowControlSettingsConfig]) | ||
final case class SubscriberConfig( | ||
pathToCredentialJson: String, | ||
topicName: TopicName, | ||
subscriptionName: Option[ProjectSubscriptionName], //it'll have the same name as topic if this is None | ||
ackDeadLine: FiniteDuration, | ||
deadLetterPolicy: Option[SubscriberDeadLetterPolicy], | ||
flowControlSettingsConfig: Option[FlowControlSettingsConfig] | ||
) | ||
final case class MaxRetries(value: Int) extends AnyVal | ||
final case class SubscriberDeadLetterPolicy(topicName: TopicName, maxRetries: MaxRetries) | ||
|
||
final case class Event[A](msg: A, traceId: Option[TraceId] = None, publishedTime: Timestamp, consumer: AckReplyConsumer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious - since these are optional could it be a non-breaking change if they default to
None
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yea, I guess (I'm not entirely sure by providing default is backward compatible since I vaguely remember I had to fix things after a PR that added optional param with default...)...I tend to not giving default just so caller have to think about these options. But I'm happy to give default here..also This has more points on why providing defaults can be bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't love defaults either, was just curious. I don't really have a strong preference, will thumb as is :)