Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support IBM amqp #2530

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ All notable changes to this project will be documented in this file.
- New `reject_errored` output.
- New `nats_request_reply` processor.
- New `json_documents` scanner.
- Field `persistent` added to the `amqp_1` output to specify whether message delivery should be persistent.
- Add IBM amqp support
- Field `target_capabilities` added to the `amqp_1` output to specify sender extension capabilities.
- Field `message_properties_to` added to the `amqp_1` output to specify the intended destination of the message.
- Field `source_capabilities` added to the `amqp_1` input to specify receiver extension capabilities.

### Fixed

Expand Down
23 changes: 23 additions & 0 deletions docs/modules/components/pages/inputs/amqp_1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ input:
root_cas: ""
root_cas_file: ""
client_certs: []
source_capabilities: [] # No default (optional)
sasl:
mechanism: none
user: ""
Expand Down Expand Up @@ -326,6 +327,28 @@ password: foo
password: ${KEY_PASSWORD}
```

=== `source_capabilities`

List of extension capabilities the receiver desires.


*Type*: `array`


```yml
# Examples

source_capabilities:
- queue

source_capabilities:
- topic

source_capabilities:
- queue
- topic
```

=== `sasl`

Enables SASL authentication.
Expand Down
48 changes: 48 additions & 0 deletions docs/modules/components/pages/outputs/amqp_1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ output:
password: ""
metadata:
exclude_prefixes: []
persistent: false
target_capabilities: [] # No default (optional)
message_properties_to: amqp://localhost:5672/ # No default (optional)
```

--
Expand Down Expand Up @@ -377,4 +380,49 @@ Provide a list of explicit metadata key prefixes to be excluded when adding meta

*Default*: `[]`

=== `persistent`

Whether message delivery should be persistent (transient by default).


*Type*: `bool`

*Default*: `false`

=== `target_capabilities`

List of extension capabilities the sender desires.


*Type*: `array`


```yml
# Examples

target_capabilities:
- queue

target_capabilities:
- topic

target_capabilities:
- queue
- topic
```

=== `message_properties_to`

Identifies the node that is the intended destination of the message.


*Type*: `string`


```yml
# Examples

message_properties_to: amqp://localhost:5672/
```


4 changes: 4 additions & 0 deletions internal/impl/amqp1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ const (
azureRenewLockField = "azure_renew_lock"
getMessageHeaderField = "read_header"
creditField = "credit"
sourceCapsField = "source_capabilities"

// Output
targetAddrField = "target_address"
appPropsMapField = "application_properties_map"
metaFilterField = "metadata"
persistentField = "persistent"
targetCapsField = "target_capabilities"
messagePropsTo = "message_properties_to"
)

// ErrSASLMechanismNotSupported is returned if a SASL mechanism was not recognised.
Expand Down
24 changes: 20 additions & 4 deletions internal/impl/amqp1/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ func amqp1InputSpec() *service.ConfigSpec {
Default(64).
Advanced(),
service.NewTLSToggledField(tlsField),
service.NewStringListField(sourceCapsField).
Description("List of extension capabilities the receiver desires.").
Optional().
Advanced().
Example([]string{"queue"}).
Example([]string{"topic"}).
Example([]string{"queue", "topic"}),
saslFieldSpec(),
).LintRule(`
root = if this.url.or("") == "" && this.urls.or([]).length() == 0 {
Expand All @@ -101,7 +108,9 @@ type amqp1Reader struct {
renewLock bool
getHeader bool
credit int // max_in_flight
srcCaps []string
connOpts *amqp.ConnOptions
recvOpts *amqp.ReceiverOptions
log *service.Logger

m sync.RWMutex
Expand All @@ -112,6 +121,7 @@ func amqp1ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
a := amqp1Reader{
log: mgr.Logger(),
connOpts: &amqp.ConnOptions{},
recvOpts: &amqp.ReceiverOptions{},
}

urlStrs, err := conf.FieldStringList(urlsField)
Expand All @@ -135,7 +145,6 @@ func amqp1ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
}

a.urls = append(a.urls, singleURL)

}

if a.sourceAddr, err = conf.FieldString(sourceAddrField); err != nil {
Expand All @@ -153,6 +162,7 @@ func amqp1ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
if a.credit, err = conf.FieldInt(creditField); err != nil {
return nil, err
}
a.recvOpts.Credit = int32(a.credit)

if err := saslOptFnsFromParsed(conf, a.connOpts); err != nil {
return nil, err
Expand All @@ -166,6 +176,14 @@ func amqp1ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
a.connOpts.TLSConfig = tlsConf
}

a.srcCaps, err = conf.FieldStringList(sourceCapsField)
if err != nil {
return nil, err
}
if len(a.srcCaps) != 0 {
a.recvOpts.SourceCapabilities = a.srcCaps
}

return &a, nil
}

Expand Down Expand Up @@ -194,9 +212,7 @@ func (a *amqp1Reader) Connect(ctx context.Context) (err error) {
}

// Create a receiver
if conn.receiver, err = conn.session.NewReceiver(ctx, a.sourceAddr, &amqp.ReceiverOptions{
Credit: int32(a.credit),
}); err != nil {
if conn.receiver, err = conn.session.NewReceiver(ctx, a.sourceAddr, a.recvOpts); err != nil {
_ = conn.Close(ctx)
return
}
Expand Down
54 changes: 51 additions & 3 deletions internal/impl/amqp1/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ This output benefits from sending multiple messages in flight in parallel for im
saslFieldSpec(),
service.NewMetadataExcludeFilterField(metaFilterField).
Description("Specify criteria for which metadata values are attached to messages as headers."),
service.NewBoolField(persistentField).
Description("Whether message delivery should be persistent (transient by default).").
Advanced().
Default(false),
service.NewStringListField(targetCapsField).
Description("List of extension capabilities the sender desires.").
Optional().
Advanced().
Example([]string{"queue"}).
Example([]string{"topic"}).
Example([]string{"queue", "topic"}),
service.NewStringField(messagePropsTo).
Description("Identifies the node that is the intended destination of the message.").
Optional().
Advanced().
Example("amqp://localhost:5672/"),
).LintRule(`
root = if this.url.or("") == "" && this.urls.or([]).length() == 0 {
"field 'urls' must be set"
Expand Down Expand Up @@ -105,15 +121,20 @@ type amqp1Writer struct {
metaFilter *service.MetadataExcludeFilter
applicationPropertiesMap *bloblang.Executor
connOpts *amqp.ConnOptions
senderOpts *amqp.SenderOptions
persistent bool
targetCaps []string
msgTo string

log *service.Logger
connLock sync.RWMutex
}

func amqp1WriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (*amqp1Writer, error) {
a := amqp1Writer{
log: mgr.Logger(),
connOpts: &amqp.ConnOptions{},
log: mgr.Logger(),
connOpts: &amqp.ConnOptions{},
senderOpts: &amqp.SenderOptions{},
}

urlStrs, err := conf.FieldStringList(urlsField)
Expand Down Expand Up @@ -164,6 +185,25 @@ func amqp1WriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
if a.metaFilter, err = conf.FieldMetadataExcludeFilter(metaFilterField); err != nil {
return nil, err
}

if a.persistent, err = conf.FieldBool(persistentField); err != nil {
return nil, err
}

a.targetCaps, err = conf.FieldStringList(targetCapsField)
if err != nil {
return nil, err
}
if len(a.targetCaps) != 0 {
a.senderOpts.TargetCapabilities = a.targetCaps
}

if conf.Contains(messagePropsTo) {
if a.msgTo, err = conf.FieldString(messagePropsTo); err != nil {
return nil, err
}
}

return &a, nil
}

Expand Down Expand Up @@ -193,7 +233,7 @@ func (a *amqp1Writer) Connect(ctx context.Context) (err error) {
}

// Create a sender
if sender, err = session.NewSender(ctx, a.targetAddr, nil); err != nil {
if sender, err = session.NewSender(ctx, a.targetAddr, a.senderOpts); err != nil {
_ = session.Close(ctx)
_ = client.Close()
return
Expand Down Expand Up @@ -280,6 +320,14 @@ func (a *amqp1Writer) Write(ctx context.Context, msg *service.Message) error {
return nil
})

if a.persistent {
m.Header = &amqp.MessageHeader{Durable: true}
}

if a.msgTo != "" {
m.Properties = &amqp.MessageProperties{To: &a.msgTo}
}

if err = s.Send(ctx, m, nil); err != nil {
if ctx.Err() == nil {
a.log.Errorf("Lost connection due to: %v\n", err)
Expand Down