-
Notifications
You must be signed in to change notification settings - Fork 217
/
main.go
52 lines (48 loc) · 1.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main
import (
"context"
"fmt"
"log"
"net/url"
"os"
"strings"
ce "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
ceamqp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/amqp"
amqp "pack.ag/amqp"
)
// Parse AMQP_URL env variable. Return server URL, AMQP node (from path) and SASLPlain
// option if user/pass are present.
func sampleConfig() (server, node string, opts []ceamqp.Option) {
env := os.Getenv("AMQP_URL")
if env == "" {
env = "/test"
}
u, err := url.Parse(env)
if err != nil {
log.Fatal(err)
}
if u.User != nil {
user := u.User.Username()
pass, _ := u.User.Password()
opts = append(opts, ceamqp.WithConnOpt(amqp.ConnSASLPlain(user, pass)))
}
return env, strings.TrimPrefix(u.Path, "/"), opts
}
func main() {
host, node, opts := sampleConfig()
t, err := ceamqp.New(host, node, opts...)
if err != nil {
log.Fatalf("Failed to create AMQP transport: %v", err)
}
c, err := client.New(t)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
err = c.StartReceiver(context.Background(), func(e ce.Event) {
fmt.Printf("==== Got CloudEvent\n%+v\n----\n", e)
})
if err != nil {
log.Fatalf("AMQP receiver error: %v", err)
}
}