forked from juju/juju
/
apiaddressupdater.go
178 lines (155 loc) · 4.95 KB
/
apiaddressupdater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2014 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package apiaddressupdater
import (
"fmt"
"sync"
"github.com/juju/errors"
"github.com/juju/worker/v3"
corenetwork "github.com/DavinZhang/juju/core/network"
"github.com/DavinZhang/juju/core/watcher"
"github.com/DavinZhang/juju/network"
)
// APIAddresser is an interface that is provided to NewAPIAddressUpdater
// which can be used to watch for API address changes.
type APIAddresser interface {
APIHostPorts() ([]corenetwork.ProviderHostPorts, error)
WatchAPIHostPorts() (watcher.NotifyWatcher, error)
}
// APIAddressSetter is an interface that is provided to NewAPIAddressUpdater
// whose SetAPIHostPorts method will be invoked whenever address changes occur.
type APIAddressSetter interface {
SetAPIHostPorts(servers []corenetwork.HostPorts) error
}
// Config defines the operation of a Worker.
type Config struct {
Addresser APIAddresser
Setter APIAddressSetter
Logger Logger
}
// Validate returns an error if config cannot drive a Worker.
func (config Config) Validate() error {
if config.Addresser == nil {
return errors.NotValidf("nil Addresser")
}
if config.Setter == nil {
return errors.NotValidf("nil Setter")
}
if config.Logger == nil {
return errors.NotValidf("nil Logger")
}
return nil
}
// APIAddressUpdater is responsible for propagating API addresses.
//
// In practice, APIAddressUpdater is used by a machine agent to watch
// API addresses in state and write the changes to the agent's config file.
type APIAddressUpdater struct {
config Config
mu sync.Mutex
current []corenetwork.ProviderHostPorts
}
// NewAPIAddressUpdater returns a worker.Worker that watches for changes to
// API addresses and then sets them on the APIAddressSetter.
func NewAPIAddressUpdater(config Config) (worker.Worker, error) {
if err := config.Validate(); err != nil {
return nil, err
}
handler := &APIAddressUpdater{
config: config,
}
w, err := watcher.NewNotifyWorker(watcher.NotifyConfig{
Handler: handler,
})
if err != nil {
return nil, errors.Trace(err)
}
return w, nil
}
// SetUp is part of the watcher.NotifyHandler interface.
func (c *APIAddressUpdater) SetUp() (watcher.NotifyWatcher, error) {
return c.config.Addresser.WatchAPIHostPorts()
}
// Handle is part of the watcher.NotifyHandler interface.
func (c *APIAddressUpdater) Handle(_ <-chan struct{}) error {
hps, err := c.getAddresses()
if err != nil {
return err
}
// Logging to identify lp: 1888453
if len(hps) == 0 {
c.config.Logger.Warningf("empty API host ports received. Updating using existing entries.")
}
c.config.Logger.Debugf("updating API hostPorts to %+v", hps)
c.mu.Lock()
// Protection case to possible help with lp: 1888453
if len(hps) != 0 {
c.current = hps
} else {
hps = c.current
}
c.mu.Unlock()
// API host/port entries are stored in state as SpaceHostPorts.
// When retrieved, the space IDs are reconciled so that they are returned
// as ProviderHostPorts.
// Here, we indirect them because they are ultimately just stored as dial
// address strings. This could be re-evaluated in the future if the space
// information becomes worthwhile to agents.
hpsToSet := make([]corenetwork.HostPorts, len(hps))
for i, hps := range hps {
hpsToSet[i] = hps.HostPorts()
}
if err := c.config.Setter.SetAPIHostPorts(hpsToSet); err != nil {
return fmt.Errorf("error setting addresses: %v", err)
}
return nil
}
func (c *APIAddressUpdater) getAddresses() ([]corenetwork.ProviderHostPorts, error) {
addresses, err := c.config.Addresser.APIHostPorts()
if err != nil {
return nil, fmt.Errorf("error getting addresses: %v", err)
}
// Filter out any LXC or LXD bridge addresses.
// See LP bugs #1416928 and #1567683.
hpsToSet := make([]corenetwork.ProviderHostPorts, 0)
for _, hostPorts := range addresses {
// Strip ports, filter, then add ports again.
filtered := network.FilterBridgeAddresses(hostPorts.Addresses())
hps := make(corenetwork.ProviderHostPorts, 0, len(filtered))
for _, hostPort := range hostPorts {
for _, addr := range filtered {
if addr.Value == hostPort.Value {
hps = append(hps, hostPort)
}
}
}
if len(hps) > 0 {
hpsToSet = append(hpsToSet, hps)
}
}
// Logging to identify lp: 1888453
if len(hpsToSet) == 0 {
c.config.Logger.Warningf("get address returning zero results after filtering, non filtered list: %v", addresses)
}
return hpsToSet, nil
}
// TearDown is part of the watcher.NotifyHandler interface.
func (c *APIAddressUpdater) TearDown() error {
return nil
}
// Report shows up in the dependency engine report.
func (c *APIAddressUpdater) Report() map[string]interface{} {
report := make(map[string]interface{})
c.mu.Lock()
defer c.mu.Unlock()
var servers [][]string
for _, server := range c.current {
var addresses []string
for _, addr := range server {
addresses = append(addresses, corenetwork.DialAddress(addr))
}
servers = append(servers, addresses)
}
report["servers"] = servers
return report
}