Skip to content

Commit

Permalink
[azservicebus] AdminClient can now create topics and subscriptions (#…
Browse files Browse the repository at this point in the history
…16044)

Adding in ability to create/update/delete Topics and Subscriptions.
- Adding in proper request/response objects, as specified by guidelines.
- Adding in tests for handling low privilege failures
- (internal) Removing old _manager structs, in favor of the new AdminClient.

Fixes #15090, #15801
  • Loading branch information
richardpark-msft committed Nov 4, 2021
1 parent 535ef32 commit 14e0cee
Show file tree
Hide file tree
Showing 20 changed files with 2,515 additions and 1,561 deletions.
5 changes: 2 additions & 3 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
enough to fit into a single batch.
- Receiving from sessions using a SessionReceiver, created using Client.AcceptSessionFor(Queue|Subscription)
or Client.AcceptNextSessionFor(Queue|Subscription).
- Can fully create, update, delete and list queues (and queue runtime properties) using the `AdministrationClient`.
- Can now renew a message lock for a ReceivedMessage using Receiver.RenewMessageLock()
- Can now renew a session lock for a SessionReceiver using SessionReceiver.RenewSessionLock()
- Can fully create, update, delete and list queues, topics and subscriptions using the `AdministrationClient`.
- Can renew message and session locks, using Receiver.RenewMessageLock() and SessionReceiver.RenewSessionLock(), respectively.

### Bugs Fixed

Expand Down
175 changes: 0 additions & 175 deletions sdk/messaging/azservicebus/admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
package azservicebus

import (
"context"
"errors"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
Expand Down Expand Up @@ -46,175 +42,4 @@ func NewAdminClient(fullyQualifiedNamespace string, tokenCredential azcore.Token
return &AdminClient{em: em}, nil
}

// AddQueue creates a queue using defaults for all options.
func (ac *AdminClient) AddQueue(ctx context.Context, queueName string) (*QueueProperties, error) {
return ac.AddQueueWithProperties(ctx, &QueueProperties{
Name: queueName,
})
}

// CreateQueue creates a queue with configurable properties.
func (ac *AdminClient) AddQueueWithProperties(ctx context.Context, properties *QueueProperties) (*QueueProperties, error) {
return ac.createOrUpdateQueueImpl(ctx, properties, true)
}

// GetQueue gets a queue by name.
func (ac *AdminClient) GetQueue(ctx context.Context, queueName string) (*QueueProperties, error) {
name, desc, err := ac.getQueueImpl(ctx, queueName)

if err != nil {
return nil, err
}

return newQueueProperties(name, desc)
}

// GetQueueRuntimeProperties gets runtime properties of a queue, like the SizeInBytes, or ActiveMessageCount.
func (ac *AdminClient) GetQueueRuntimeProperties(ctx context.Context, queueName string) (*QueueRuntimeProperties, error) {
name, desc, err := ac.getQueueImpl(ctx, queueName)

if err != nil {
return nil, err
}

return newQueueRuntimeProperties(name, desc), nil
}

// QueueExists checks if a queue exists.
// Returns true if the queue is found
// (false, nil) if the queue is not found
// (false, err) if an error occurred while trying to check if the queue exists.
func (ac *AdminClient) QueueExists(ctx context.Context, queueName string) (bool, error) {
_, err := ac.GetQueue(ctx, queueName)

if err == nil {
return true, nil
}

var httpResponse azcore.HTTPResponse

if errors.As(err, &httpResponse) && httpResponse.RawResponse().StatusCode == 404 {
return false, nil
}

return false, err
}

// UpdateQueue updates an existing queue.
func (ac *AdminClient) UpdateQueue(ctx context.Context, properties *QueueProperties) (*QueueProperties, error) {
return ac.createOrUpdateQueueImpl(ctx, properties, false)
}

// DeleteQueue deletes a queue.
func (ac *AdminClient) DeleteQueue(ctx context.Context, queueName string) (*http.Response, error) {
resp, err := ac.em.Delete(ctx, "/"+queueName)

if err != nil {
return nil, err
}

return resp, nil
}

// ListQueuesOptions can be used to configure the ListQueues method.
type ListQueuesOptions struct {
// Top is the maximum size of each page of results.
Top int
// Skip is the starting index for the paging operation.
Skip int
}

// QueuePropertiesPager provides iteration over ListQueueProperties pages.
type QueuePropertiesPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueProperties.
PageResponse() []*QueueProperties

// Err returns the last error encountered while paging.
Err() error
}

// ListQueues lists queues.
func (ac *AdminClient) ListQueues(options *ListQueuesOptions) QueuePropertiesPager {
var pageSize int
var skip int

if options != nil {
skip = options.Skip
pageSize = options.Top
}

return &queuePropertiesPager{
adminClient: ac,
pageSize: pageSize,
skip: skip,
}
}

// ListQueuesRuntimePropertiesOptions can be used to configure the ListQueuesRuntimeProperties method.
type ListQueuesRuntimePropertiesOptions struct {
// Top is the maximum size of each page of results.
Top int
// Skip is the starting index for the paging operation.
Skip int
}

// QueueRuntimePropertiesPager provides iteration over ListQueueRuntimeProperties pages.
type QueueRuntimePropertiesPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueRuntimeProperties.
PageResponse() []*QueueRuntimeProperties

// Err returns the last error encountered while paging.
Err() error
}

// ListQueuesRuntimeProperties lists runtime properties for queues.
func (ac *AdminClient) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) QueueRuntimePropertiesPager {
var pageSize int
var skip int

if options != nil {
skip = options.Skip
pageSize = options.Top
}

return &queueRuntimePropertiesPager{
adminClient: ac,
pageSize: pageSize,
skip: skip,
}
}

// func (ac *AdminClient) GetNamespaceProperties() {}

// func (ac *AdminClient) CreateTopic() {}
// func (ac *AdminClient) CreateSubscription() {}
// func (ac *AdminClient) CreateRule() {}
// func (ac *AdminClient) DeleteTopic() {}
// func (ac *AdminClient) DeleteSubscription() {}
// func (ac *AdminClient) DeleteRule() {}
// func (ac *AdminClient) GetRule() {}
// func (ac *AdminClient) GetSubscription() {}
// func (ac *AdminClient) GetSubscriptionRuntimeProperties() {}
// func (ac *AdminClient) GetTopic() {}
// func (ac *AdminClient) GetTopicRuntimeProperties() {}
// func (ac *AdminClient) ListRules() {}
// func (ac *AdminClient) ListTopics() {}
// func (ac *AdminClient) ListTopicsRuntimeProperties() {}
// func (ac *AdminClient) ListSubscriptions() {}
// func (ac *AdminClient) ListSubscriptionsRuntimeProperties() {}

// func (ac *AdminClient) TopicExists() {}
// func (ac *AdminClient) SubscriptionExists() {}
// func (ac *AdminClient) RuleExists() {}

// func (ac *AdminClient) UpdateTopic() {}
// func (ac *AdminClient) UpdateSubscription() {}
// func (ac *AdminClient) UpdateRule() {}
97 changes: 0 additions & 97 deletions sdk/messaging/azservicebus/admin_client_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,103 +7,6 @@ import (
"time"
)

// QueueProperties represents the static properties of the queue.
type QueueProperties struct {
// Name of the queue relative to the namespace base address.
Name string

// LockDuration - The duration a message is locked when using the PeekLock receive mode.
// Default is 1 minute.
LockDuration *time.Duration

// MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory
// allocated for the queue.
// Default is 1024.
MaxSizeInMegabytes *int32

// RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
RequiresDuplicateDetection *bool

// RequiresSession indicates whether the queue supports the concept of sessions.
// Sessionful-messages follow FIFO ordering.
// Default is false.
RequiresSession *bool

// DefaultMessageTimeToLive is the duration after which the message expires, starting from when
// the message is sent to Service Bus. This is the default value used when TimeToLive is not
// set on a message itself.
DefaultMessageTimeToLive *time.Duration

// DeadLetteringOnMessageExpiration indicates whether this queue has dead letter
// support when a message expires.
DeadLetteringOnMessageExpiration *bool

// DuplicateDetectionHistoryTimeWindow is the duration of duplicate detection history.
// Default value is 10 minutes.
DuplicateDetectionHistoryTimeWindow *time.Duration

// MaxDeliveryCount is the maximum amount of times a message can be delivered before it is automatically
// sent to the dead letter queue.
// Default value is 10.
MaxDeliveryCount *int32

// EnableBatchedOperations indicates whether server-side batched operations are enabled.
EnableBatchedOperations *bool

// The current status of the queue.
Status *EntityStatus

// AutoDeleteOnIdle is the idle interval after which the queue is automatically deleted.
AutoDeleteOnIdle *time.Duration

// Indicates whether the queue is to be partitioned across multiple message brokers.
EnablePartitioning *bool

// ForwardTo is the name of the recipient entity to which all the messages sent to the queue
// are forwarded to.
ForwardTo *string

// ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
ForwardDeadLetteredMessagesTo *string
}

// QueueRuntimeProperties represent dynamic properties of a queue, such as the ActiveMessageCount.
type QueueRuntimeProperties struct {
// Name is the name of the queue.
Name string

// SizeInBytes - The size of the queue, in bytes.
SizeInBytes int64

// CreatedAt is when the entity was created.
CreatedAt time.Time

// UpdatedAt is when the entity was last updated.
UpdatedAt time.Time

// AccessedAt is when the entity was last updated.
AccessedAt time.Time

// TotalMessageCount is the number of messages in the queue.
TotalMessageCount int64

// ActiveMessageCount is the number of active messages in the entity.
ActiveMessageCount int32

// DeadLetterMessageCount is the number of dead-lettered messages in the entity.
DeadLetterMessageCount int32

// ScheduledMessageCount is the number of messages that are scheduled to be enqueued.
ScheduledMessageCount int32

// TransferDeadLetterMessageCount is the number of messages transfer-messages which are dead-lettered
// into transfer-dead-letter subqueue.
TransferDeadLetterMessageCount int32

// TransferMessageCount is the number of messages which are yet to be transferred/forwarded to destination entity.
TransferMessageCount int32
}

// EntityStatus represents the current status of the entity.
type EntityStatus string

Expand Down

0 comments on commit 14e0cee

Please sign in to comment.