-
Notifications
You must be signed in to change notification settings - Fork 218
/
sender.go
45 lines (36 loc) · 1.05 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package amqp
import (
"context"
"pack.ag/amqp"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
)
// sender wraps an amqp.Sender as a binding.Sender
type sender struct {
amqp *amqp.Sender
transformers binding.TransformerFactories
}
func (s *sender) Send(ctx context.Context, in binding.Message) error {
var err error
defer func() { _ = in.Finish(err) }()
if m, ok := in.(*Message); ok { // Already an AMQP message.
err = s.amqp.Send(ctx, m.AMQP)
return err
}
var amqpMessage amqp.Message
err = WriteMessage(ctx, in, &amqpMessage, s.transformers)
if err != nil {
return err
}
err = s.amqp.Send(ctx, &amqpMessage)
return err
}
func (s *sender) Close(ctx context.Context) error { return s.amqp.Close(ctx) }
// Create a new Sender which wraps an amqp.Sender in a binding.Sender
func NewSender(amqpSender *amqp.Sender, options ...SenderOptionFunc) protocol.Sender {
s := &sender{amqp: amqpSender, transformers: make(binding.TransformerFactories, 0)}
for _, o := range options {
o(s)
}
return s
}