-
Notifications
You must be signed in to change notification settings - Fork 18
/
rabbitfixture.go
103 lines (86 loc) · 2.4 KB
/
rabbitfixture.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package rabbitfixture
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/makasim/amqpextra"
amqp "github.com/rabbitmq/amqp091-go"
"gotest.tools/v3/assert"
"github.com/circleci/ex/config/secret"
"github.com/circleci/ex/o11y"
"github.com/circleci/ex/testing/internal/types"
"github.com/circleci/ex/testing/rabbitfixture/internal/rabbit"
)
func Dialer(ctx context.Context, t types.TestingTB, u string) *amqpextra.Dialer {
dialer, err := amqpextra.NewDialer(
amqpextra.WithContext(ctx),
amqpextra.WithURL(u),
amqpextra.WithConnectionProperties(amqp.Table{
"connection_name": "rabbitfixture",
}),
)
assert.Assert(t, err)
t.Cleanup(dialer.Close)
return dialer
}
func New(ctx context.Context, t types.TestingTB) (rawurl string) {
t.Helper()
return NewWithConnection(ctx, t, Connection{})
}
type Connection struct {
Host string
User string
Password secret.String
}
func NewWithConnection(ctx context.Context, t types.TestingTB, con Connection) (rawurl string) {
t.Helper()
ctx, span := o11y.StartSpan(ctx, "rabbitfixure: vhost")
defer span.End()
if con.Host == "" {
con.Host = "localhost"
}
if con.User == "" {
con.User = "guest"
}
if con.Password.Value() == "" {
con.Password = "guest"
}
vhost := fmt.Sprintf("%s-%s", t.Name(), randomSuffix())
span.AddField("vhost", vhost)
rawurl = fmt.Sprintf("amqp://%s:%s@%s/%s", con.User, con.Password.Value(), con.Host, vhost)
span.AddField("url", rawurl)
client := rabbit.NewClient(fmt.Sprintf("http://%s:15672", con.Host), con.User, con.Password)
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 5 * time.Second
if os.Getenv("CI") == "true" {
bo.MaxElapsedTime = 32 * time.Second
}
_, err := client.ListVHosts(ctx)
assert.Assert(t, err)
// Delete is idempotent, and will not error for non-existent vhost
err = client.DeleteVHost(ctx, vhost)
assert.Assert(t, err)
err = client.PutVHost(ctx, vhost, rabbit.VHostSettings{})
assert.Assert(t, err)
err = client.UpdatePermissionsIn(ctx, vhost, "guest", rabbit.Permissions{
Configure: ".*",
Write: ".*",
Read: ".*",
})
t.Cleanup(func() {
err = client.DeleteVHost(ctx, vhost)
assert.Check(t, err)
})
return rawurl
}
func randomSuffix() string {
bytes := make([]byte, 10)
if _, err := rand.Read(bytes); err != nil {
return "not-random--i-hope-thats-ok"
}
return hex.EncodeToString(bytes)
}