From 6f562454e27fd6a387c3718fce64a9b212b84c83 Mon Sep 17 00:00:00 2001 From: limingnihao Date: Fri, 18 Jun 2021 09:15:14 +0800 Subject: [PATCH] Add command topic Delayed Delivery Policies. (streamnative/pulsarctl#246) (#374) Add command topic Delayed Delivery Policies: - pulsarctl topics get-delayed-delivery [topic] - pulsarctl topics set-delayed-delivery [topic] -t 22s -e - pulsarctl topics remove-delayed-delivery [topic] --- pulsaradmin/pkg/pulsar/topic.go | 26 ++++++++++++++++++++++++++ pulsaradmin/pkg/pulsar/utils/data.go | 11 +++++++++++ 2 files changed, 37 insertions(+) diff --git a/pulsaradmin/pkg/pulsar/topic.go b/pulsaradmin/pkg/pulsar/topic.go index 54b0e8cb3d..a92e8a8d01 100644 --- a/pulsaradmin/pkg/pulsar/topic.go +++ b/pulsaradmin/pkg/pulsar/topic.go @@ -152,6 +152,15 @@ type Topics interface { // RemovePersistence Remove the persistence policies for a topic RemovePersistence(utils.TopicName) error + + // GetDelayedDelivery Get the delayed delivery policy for a topic + GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error) + + // SetDelayedDelivery Set the delayed delivery policy on a topic + SetDelayedDelivery(utils.TopicName, utils.DelayedDeliveryData) error + + // RemoveDelayedDelivery Remove the delayed delivery policy on a topic + RemoveDelayedDelivery(utils.TopicName) error } type topics struct { @@ -467,3 +476,20 @@ func (t *topics) RemovePersistence(topic utils.TopicName) error { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence") return t.pulsar.Client.Delete(endpoint) } + +func (t *topics) GetDelayedDelivery(topic utils.TopicName) (*utils.DelayedDeliveryData, error) { + var delayedDeliveryData utils.DelayedDeliveryData + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery") + err := t.pulsar.Client.Get(endpoint, &delayedDeliveryData) + return &delayedDeliveryData, err +} + +func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery") + return t.pulsar.Client.Post(endpoint, &delayedDeliveryData) +} + +func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery") + return t.pulsar.Client.Delete(endpoint) +} diff --git a/pulsaradmin/pkg/pulsar/utils/data.go b/pulsaradmin/pkg/pulsar/utils/data.go index 2aa8e8d2ae..95eb733072 100644 --- a/pulsaradmin/pkg/pulsar/utils/data.go +++ b/pulsaradmin/pkg/pulsar/utils/data.go @@ -370,3 +370,14 @@ type PersistenceData struct { BookkeeperAckQuorum int64 `json:"bookkeeperAckQuorum"` ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"` } + +type DelayedDeliveryCmdData struct { + Enable bool `json:"enable"` + Disable bool `json:"disable"` + DelayedDeliveryTimeStr string `json:"delayedDeliveryTimeStr"` +} + +type DelayedDeliveryData struct { + TickTime float64 `json:"tickTime"` + Active bool `json:"active"` +}