Skip to content

Commit

Permalink
Implement queue update for PubSub
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed May 23, 2024
1 parent 41d11c0 commit c3b129a
Showing 1 changed file with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import com.commercetools.queue.QueueAdministration
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider}
import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings}
import com.google.protobuf.Duration
import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName}
import com.google.protobuf.{Duration, FieldMask}
import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName, UpdateSubscriptionRequest, UpdateTopicRequest}

import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -81,7 +81,94 @@ class PubSubAdministration[F[_]](
}
.adaptError(makeQueueException(_, name))

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = ???
override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = {
val topicName = TopicName.of(project, name)
val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name")
val (updateTopicRequest, updateSubscriptionRequest) =
(messageTTL, lockTTL) match {
case (Some(messageTTL), Some(lockTTL)) =>
val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build()
(
Some(
UpdateTopicRequest
.newBuilder()
.setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build())
.setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build())
.build()),
Some(
UpdateSubscriptionRequest
.newBuilder()
.setSubscription(
Subscription
.newBuilder()
.setTopic(topicName.toString())
.setName(subscriptionName.toString())
.setMessageRetentionDuration(mttl)
.setAckDeadlineSeconds(lockTTL.toSeconds.toInt)
.build())
.setUpdateMask(
FieldMask
.newBuilder()
.addPaths("message_retention_duration")
.addPaths("ack_deadline_seconds")
.build())
.build()))
case (Some(messageTTL), None) =>
val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build()
(
Some(
UpdateTopicRequest
.newBuilder()
.setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build())
.setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build())
.build()),
Some(
UpdateSubscriptionRequest
.newBuilder()
.setSubscription(
Subscription
.newBuilder()
.setTopic(topicName.toString())
.setName(subscriptionName.toString())
.setMessageRetentionDuration(mttl)
.build())
.setUpdateMask(FieldMask
.newBuilder()
.addPaths("message_retention_duration")
.build())
.build()))
case (None, Some(lockTTL)) =>
(
None,
Some(
UpdateSubscriptionRequest
.newBuilder()
.setSubscription(
Subscription
.newBuilder()
.setTopic(topicName.toString())
.setName(subscriptionName.toString())
.setAckDeadlineSeconds(lockTTL.toSeconds.toInt)
.build())
.setUpdateMask(FieldMask
.newBuilder()
.addPaths("ack_deadline_seconds")
.build())
.build()))
case (None, None) =>
(None, None)
}
updateTopicRequest.traverse_ { req =>
adminClient.use { client =>
wrapFuture(F.delay(client.updateTopicCallable().futureCall(req)))
}
} *>
updateSubscriptionRequest.traverse_ { req =>
subscriptionClient.use { client =>
wrapFuture(F.delay(client.updateSubscriptionCallable().futureCall(req)))
}
}
}

override def delete(name: String): F[Unit] = {
adminClient.use { client =>
Expand Down

0 comments on commit c3b129a

Please sign in to comment.