-
Notifications
You must be signed in to change notification settings - Fork 783
/
catalog_nodes.go
140 lines (116 loc) · 3.07 KB
/
catalog_nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package dependency
import (
"encoding/gob"
"fmt"
"log"
"net/url"
"regexp"
"sort"
"github.com/pkg/errors"
)
var (
// Ensure implements
_ Dependency = (*CatalogNodesQuery)(nil)
// CatalogNodesQueryRe is the regular expression to use.
CatalogNodesQueryRe = regexp.MustCompile(`\A` + dcRe + nearRe + `\z`)
)
func init() {
gob.Register([]*Node{})
}
// Node is a node entry in Consul
type Node struct {
Node string
Address string
TaggedAddresses map[string]string
}
// CatalogNodesQuery is the representation of all registered nodes in Consul.
type CatalogNodesQuery struct {
stopCh chan struct{}
dc string
near string
}
// NewCatalogNodesQuery parses the given string into a dependency. If the name is
// empty then the name of the local agent is used.
func NewCatalogNodesQuery(s string) (*CatalogNodesQuery, error) {
if !CatalogNodesQueryRe.MatchString(s) {
return nil, fmt.Errorf("catalog.nodes: invalid format: %q", s)
}
m := regexpMatch(CatalogNodesQueryRe, s)
return &CatalogNodesQuery{
dc: m["dc"],
near: m["near"],
stopCh: make(chan struct{}, 1),
}, nil
}
// Fetch queries the Consul API defined by the given client and returns a slice
// of Node objects
func (d *CatalogNodesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-d.stopCh:
return nil, nil, ErrStopped
default:
}
opts = opts.Merge(&QueryOptions{
Datacenter: d.dc,
Near: d.near,
})
log.Printf("[TRACE] %s: GET %s", d, &url.URL{
Path: "/v1/catalog/nodes",
RawQuery: opts.String(),
})
n, qm, err := clients.Consul().Catalog().Nodes(opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
log.Printf("[TRACE] %s: returned %d results", d, len(n))
nodes := make([]*Node, 0, len(n))
for _, node := range n {
nodes = append(nodes, &Node{
Node: node.Node,
Address: node.Address,
TaggedAddresses: node.TaggedAddresses,
})
}
sort.Stable(ByNode(nodes))
rm := &ResponseMetadata{
LastIndex: qm.LastIndex,
LastContact: qm.LastContact,
}
return nodes, rm, nil
}
// CanShare returns a boolean if this dependency is shareable.
func (d *CatalogNodesQuery) CanShare() bool {
return true
}
// String returns the human-friendly version of this dependency.
func (d *CatalogNodesQuery) String() string {
name := ""
if d.dc != "" {
name = name + "@" + d.dc
}
if d.near != "" {
name = name + "~" + d.near
}
if name == "" {
return "catalog.nodes"
}
return fmt.Sprintf("catalog.nodes(%s)", name)
}
// Stop halts the dependency's fetch function.
func (d *CatalogNodesQuery) Stop() {
close(d.stopCh)
}
// Type returns the type of this dependency.
func (d *CatalogNodesQuery) Type() Type {
return TypeConsul
}
// ByNode is a sortable list of nodes by name and then IP address.
type ByNode []*Node
func (s ByNode) Len() int { return len(s) }
func (s ByNode) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ByNode) Less(i, j int) bool {
if s[i].Node == s[j].Node {
return s[i].Address <= s[j].Address
}
return s[i].Node <= s[j].Node
}