Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to create exchange in publisher #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
@@ -1 +1 @@
.idea/
.idea/
27 changes: 9 additions & 18 deletions consume.go
Expand Up @@ -160,30 +160,21 @@ func (consumer Consumer) startGoroutines(
}
}

if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
if consumeOptions.ExchangeOptions != nil {
exchangeOptions := consumeOptions.ExchangeOptions
if exchangeOptions.Name == "" {
return fmt.Errorf("binding to exchangeOptions but name not specified")
}
if exchange.Declare {
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}

if err := declareOrVerifyExchange(consumeOptions.ExchangeOptions, consumer.chManager.channel); err != nil {
return err
}

for _, routingKey := range routingKeys {
err := consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
exchangeOptions.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
Expand Down
50 changes: 10 additions & 40 deletions consume_options.go
Expand Up @@ -9,9 +9,9 @@ func getDefaultConsumeOptions() ConsumeOptions {
QueueNoWait: false,
QueueDeclare: true,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
ExchangeOptions: nil,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
Expand All @@ -32,9 +32,9 @@ type ConsumeOptions struct {
QueueNoWait bool
QueueDeclare bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
ExchangeOptions *ExchangeOptions
Concurrency int
QOSPrefetch int
QOSGlobal bool
Expand All @@ -46,36 +46,6 @@ type ConsumeOptions struct {
ConsumerArgs Table
}

// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
return options.BindingExchange
}

// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}

// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
Expand Down Expand Up @@ -124,49 +94,49 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) {
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
getConsumerExchangeOptionsOrSetDefault(options).Name = name
}
}

// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
getConsumerExchangeOptionsOrSetDefault(options).Kind = kind
}
}

// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
getConsumerExchangeOptionsOrSetDefault(options).Durable = true
}

// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
getConsumerExchangeOptionsOrSetDefault(options).AutoDelete = true
}

// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
getConsumerExchangeOptionsOrSetDefault(options).Internal = true
}

// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
getConsumerExchangeOptionsOrSetDefault(options).NoWait = true
}

// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
getConsumerExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}

// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Declare = false
getConsumerExchangeOptionsOrSetDefault(options).Declare = false
}

// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
Expand Down
2 changes: 1 addition & 1 deletion examples/logger/main.go
Expand Up @@ -35,6 +35,7 @@ func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
rabbitmq.WithPublisherOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
Expand All @@ -45,7 +46,6 @@ func main() {
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher/main.go
Expand Up @@ -15,6 +15,7 @@ func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -65,7 +66,6 @@ func main() {
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
Expand Down
71 changes: 71 additions & 0 deletions exchange.go
@@ -0,0 +1,71 @@
package rabbitmq

import amqp "github.com/rabbitmq/amqp091-go"

// ExchangeOptions are used when configuring or binding to an exchange.
// it will verify the exchange is created before binding to it.
type ExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}

// getConsumerExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values.
func getConsumerExchangeOptionsOrSetDefault(options *ConsumeOptions) *ExchangeOptions {
if options.ExchangeOptions == nil {
options.ExchangeOptions = getDefaultExchangeOptions()
}
return options.ExchangeOptions
}

// getPublisherExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values.
func getPublisherExchangeOptionsOrSetDefault(options *PublisherOptions) *ExchangeOptions {
if options.ExchangeOptions == nil {
options.ExchangeOptions = getDefaultExchangeOptions()
}
return options.ExchangeOptions
}

// getDefaultExchangeOptions returns pointer to the default Exchange options.
func getDefaultExchangeOptions() *ExchangeOptions {
return &ExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}

// getDefaultExchangeOptions declares or verifies the existence of an exchange.
func declareOrVerifyExchange(exchangeOptions *ExchangeOptions, channel *amqp.Channel) error {
if exchangeOptions.Declare {
return channel.ExchangeDeclare(
exchangeOptions.Name,
exchangeOptions.Kind,
exchangeOptions.Durable,
exchangeOptions.AutoDelete,
exchangeOptions.Internal,
exchangeOptions.NoWait,
tableToAMQPTable(exchangeOptions.ExchangeArgs),
)
}

return channel.ExchangeDeclarePassive(
exchangeOptions.Name,
exchangeOptions.Kind,
exchangeOptions.Durable,
exchangeOptions.AutoDelete,
exchangeOptions.Internal,
exchangeOptions.NoWait,
tableToAMQPTable(exchangeOptions.ExchangeArgs),
)
}