Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions pubsub/rabbitmq/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@ package rabbitmq

import (
"fmt"
"net/url"
"strconv"
"time"

amqp "github.com/rabbitmq/amqp091-go"

"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)

type metadata struct {
consumerID string
host string
connectionString string
protocol string
hostname string
username string
password string
durable bool
enableDeadLetter bool
deleteWhenUnused bool
Expand All @@ -42,8 +48,16 @@ type metadata struct {
}

const (
metadataConsumerIDKey = "consumerID"
metadataHostKey = "host"
metadataConsumerIDKey = "consumerID"

metadataConnectionStringKey = "connectionString"
metadataHostKey = "host"

metadataProtocolKey = "protocol"
metadataHostnameKey = "hostname"
metadataUsernameKey = "username"
metadataPasswordKey = "password"

metadataDurableKey = "durable"
metadataEnableDeadLetterKey = "enableDeadLetter"
metadataDeleteWhenUnusedKey = "deletedWhenUnused"
Expand All @@ -61,8 +75,10 @@ const (
)

// createMetadata creates a new instance from the pubsub metadata.
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*metadata, error) {
result := metadata{
protocol: "amqp",
hostname: "localhost",
durable: true,
deleteWhenUnused: true,
autoAck: false,
Expand All @@ -71,10 +87,27 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
publisherConfirm: false,
}

if val, found := pubSubMetadata.Properties[metadataHostKey]; found && val != "" {
result.host = val
} else {
return &result, fmt.Errorf("%s missing RabbitMQ host", errorMessagePrefix)
if val, found := pubSubMetadata.Properties[metadataConnectionStringKey]; found && val != "" {
result.connectionString = val
} else if val, found := pubSubMetadata.Properties[metadataHostKey]; found && val != "" {
result.connectionString = val
log.Warn("[DEPRECATION NOTICE] The 'host' argument is deprecated. Use 'connectionString' or individual connection arguments instead: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-rabbitmq/")
}

if val, found := pubSubMetadata.Properties[metadataProtocolKey]; found && val != "" {
result.protocol = val
}

if val, found := pubSubMetadata.Properties[metadataHostnameKey]; found && val != "" {
result.hostname = val
}

if val, found := pubSubMetadata.Properties[metadataUsernameKey]; found && val != "" {
result.username = val
}

if val, found := pubSubMetadata.Properties[metadataPasswordKey]; found && val != "" {
result.password = val
}

if val, found := pubSubMetadata.Properties[metadataConsumerIDKey]; found && val != "" {
Expand Down Expand Up @@ -184,3 +217,22 @@ func (m *metadata) formatQueueDeclareArgs(origin amqp.Table) amqp.Table {
func exchangeKindValid(kind string) bool {
return kind == amqp.ExchangeFanout || kind == amqp.ExchangeTopic || kind == amqp.ExchangeDirect || kind == amqp.ExchangeHeaders
}

func (m *metadata) connectionURI() string {
if m.connectionString != "" {
return m.connectionString
}

u := url.URL{
Scheme: m.protocol,
Host: m.hostname,
}

if m.username != "" && m.password != "" {
u.User = url.UserPassword(m.username, m.password)
} else if m.username != "" {
u.User = url.User(m.username)
}

return u.String()
}
Loading