-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
proxy.go
159 lines (134 loc) · 4.31 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package proxy
import (
"crypto/x509"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/lib"
)
// Proxy implements the built-in connect proxy.
type Proxy struct {
client *api.Client
cfgWatcher ConfigWatcher
stopChan chan struct{}
logger hclog.Logger
service *connect.Service
}
// New returns a proxy with the given configuration source.
//
// The ConfigWatcher can be used to update the configuration of the proxy.
// Whenever a new configuration is detected, the proxy will reconfigure itself.
func New(client *api.Client, cw ConfigWatcher, logger hclog.Logger) (*Proxy, error) {
return &Proxy{
client: client,
cfgWatcher: cw,
stopChan: make(chan struct{}),
logger: logger,
}, nil
}
// Serve the proxy instance until a fatal error occurs or proxy is closed.
func (p *Proxy) Serve() error {
var cfg *Config
// failCh is used to stop Serve and return an error from another goroutine we
// spawn.
failCh := make(chan error, 1)
// Watch for config changes (initial setup happens on first "change")
for {
select {
case err := <-failCh:
// don't log here, we can log with better context at the point where we
// write the err to the chan
return err
case newCfg := <-p.cfgWatcher.Watch():
p.logger.Debug("got new config")
if cfg == nil {
// Initial setup
// Setup telemetry if configured
// NOTE(kit): As far as I can tell, all of the metrics in the proxy are generated at runtime, so we
// don't have any static metrics we initialize at start.
_, err := lib.InitTelemetry(newCfg.Telemetry, p.logger)
if err != nil {
p.logger.Error("proxy telemetry config error", "error", err)
}
// Setup Service instance now we know target ID etc
service, err := newCfg.Service(p.client, p.logger)
if err != nil {
return err
}
p.service = service
go func() {
<-service.ReadyWait()
p.logger.Info("Proxy loaded config and ready to serve")
tcfg := service.ServerTLSConfig()
cert, _ := tcfg.GetCertificate(nil)
leaf, _ := x509.ParseCertificate(cert.Certificate[0])
p.logger.Info("Parsed TLS identity", "uri", leaf.URIs[0])
// Only start a listener if we have a port set. This allows
// the configuration to disable our public listener.
if newCfg.PublicListener.BindPort != 0 {
newCfg.PublicListener.applyDefaults()
l := NewPublicListener(p.service, newCfg.PublicListener, p.logger)
err = p.startListener("public listener", l)
if err != nil {
// This should probably be fatal.
p.logger.Error("failed to start public listener", "error", err)
failCh <- err
}
}
}()
}
// TODO(banks) update/remove upstreams properly based on a diff with current. Can
// store a map of uc.String() to Listener here and then use it to only
// start one of each and stop/modify if changes occur.
for _, uc := range newCfg.Upstreams {
uc.applyDefaults()
if uc.LocalBindSocketPath != "" {
p.logger.Error("local_bind_socket_path is not supported with this proxy implementation. "+
"Can't start upstream.", "upstream", uc.String())
continue
}
if uc.LocalBindPort < 1 {
p.logger.Error("upstream has no local_bind_port. "+
"Can't start upstream.", "upstream", uc.String())
continue
}
l := NewUpstreamListener(p.service, p.client, uc, p.logger)
err := p.startListener(uc.String(), l)
if err != nil {
p.logger.Error("failed to start upstream",
"upstream", uc.String(),
"error", err,
)
}
}
cfg = newCfg
case <-p.stopChan:
if p.service != nil {
p.service.Close()
}
return nil
}
}
}
// startPublicListener is run from the internal state machine loop
func (p *Proxy) startListener(name string, l *Listener) error {
p.logger.Info("Starting listener", "listener", name, "bind_addr", l.BindAddr())
go func() {
err := l.Serve()
if err != nil {
p.logger.Error("listener stopped with error", "listener", name, "error", err)
return
}
p.logger.Info("listener stopped", "listener", name)
}()
go func() {
<-p.stopChan
l.Close()
}()
return nil
}
// Close stops the proxy and terminates all active connections. It must be
// called only once.
func (p *Proxy) Close() {
close(p.stopChan)
}