forked from mailgun/kafka-pixy
/
testhelpers.go
70 lines (60 loc) · 1.76 KB
/
testhelpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package testhelpers
import (
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/mailgun/kafka-pixy/config"
"github.com/mailgun/kafka-pixy/logging"
)
const (
VagrantKafkaPeers = "192.168.100.67:9091"
VagrantZookeeperPeers = "192.168.100.67:2181"
)
var (
KafkaPeers []string
ZookeeperPeers []string
initTestOnce = sync.Once{}
)
func init() {
kafkaPeersStr := os.Getenv("KAFKA_PEERS")
if kafkaPeersStr == "" {
kafkaPeersStr = VagrantKafkaPeers
}
KafkaPeers = strings.Split(kafkaPeersStr, ",")
zookeeperPeersStr := os.Getenv("ZOOKEEPER_PEERS")
if zookeeperPeersStr == "" {
zookeeperPeersStr = VagrantZookeeperPeers
}
ZookeeperPeers = strings.Split(zookeeperPeersStr, ",")
}
// InitLogging initializes both internal and 3rd party loggers to output logs
// using the test context object's `Log` function.
func InitLogging() {
initTestOnce.Do(func() {
logging.Init(`[{"name": "console"}]`, nil)
})
}
func NewTestProxyCfg(clientID string) *config.Proxy {
cfg := config.DefaultProxy()
cfg.ClientID = clientID
cfg.Kafka.SeedPeers = KafkaPeers
cfg.ZooKeeper.SeedPeers = ZookeeperPeers
cfg.Consumer.LongPollingTimeout = 3000 * time.Millisecond
cfg.Consumer.RetryBackoff = 100 * time.Millisecond
cfg.Consumer.RebalanceDelay = 100 * time.Millisecond
cfg.Consumer.AckTimeout = 100 * time.Millisecond
cfg.Consumer.OffsetsCommitInterval = 100 * time.Millisecond
return cfg
}
// NewUDSHTTPClient creates an HTTP client that always connects to the
// specified unix domain socket ignoring the host part of requested HTTP URLs.
func NewUDSHTTPClient(unixSockAddr string) *http.Client {
dial := func(proto, addr string) (net.Conn, error) {
return net.Dial("unix", unixSockAddr)
}
tr := &http.Transport{Dial: dial}
return &http.Client{Transport: tr}
}