-
-
Notifications
You must be signed in to change notification settings - Fork 125
/
lazy_client.go
122 lines (105 loc) · 2.19 KB
/
lazy_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kafka
import (
"crypto/tls"
"fmt"
"log"
"sync"
"github.com/Shopify/sarama"
)
type LazyClient struct {
once sync.Once
initErr error
inner *Client
Config *Config
}
func (c *LazyClient) init() error {
var err error
c.once.Do(func() {
c.inner, err = NewClient(c.Config)
c.initErr = err
})
if c.Config != nil {
log.Printf("[DEBUG] lazy client init %s; config, %v", c.initErr, c.Config.copyWithMaskedSensitiveValues())
} else {
log.Printf("[DEBUG] lazy client init %s", c.initErr)
}
if c.initErr == sarama.ErrBrokerNotAvailable || c.initErr == sarama.ErrOutOfBrokers {
if c.Config.TLSEnabled {
tlsError := c.checkTLSConfig()
if tlsError != nil {
return fmt.Errorf("%w\n%s", tlsError, c.initErr)
}
}
}
return c.initErr
}
func (c *LazyClient) checkTLSConfig() error {
kafkaConfig, err := c.Config.newKafkaConfig()
if err != nil {
return err
}
brokers := *(c.Config.BootstrapServers)
broker := brokers[0]
tlsConf := kafkaConfig.Net.TLS.Config
conn, err := tls.Dial("tcp", broker, tlsConf)
if err != nil {
return err
}
return conn.Handshake()
}
func (c *LazyClient) CreateTopic(t Topic) error {
err := c.init()
if err != nil {
return err
}
return c.inner.CreateTopic(t)
}
func (c *LazyClient) ReadTopic(name string) (Topic, error) {
err := c.init()
if err != nil {
return Topic{}, err
}
return c.inner.ReadTopic(name)
}
func (c *LazyClient) UpdateTopic(t Topic) error {
err := c.init()
if err != nil {
return err
}
return c.inner.UpdateTopic(t)
}
func (c *LazyClient) DeleteTopic(t string) error {
err := c.init()
if err != nil {
return err
}
return c.inner.DeleteTopic(t)
}
func (c *LazyClient) AddPartitions(t Topic) error {
err := c.init()
if err != nil {
return err
}
return c.inner.AddPartitions(t)
}
func (c *LazyClient) CreateACL(s StringlyTypedACL) error {
err := c.init()
if err != nil {
return err
}
return c.inner.CreateACL(s)
}
func (c *LazyClient) ListACLs() ([]*sarama.ResourceAcls, error) {
err := c.init()
if err != nil {
return nil, err
}
return c.inner.ListACLs()
}
func (c *LazyClient) DeleteACL(s StringlyTypedACL) error {
err := c.init()
if err != nil {
return err
}
return c.inner.DeleteACL(s)
}