-
Notifications
You must be signed in to change notification settings - Fork 782
/
kv_list.go
147 lines (121 loc) · 3.13 KB
/
kv_list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package dependency
import (
"encoding/gob"
"fmt"
"log"
"net/url"
"regexp"
"strings"
"github.com/pkg/errors"
)
var (
// Ensure implements
_ Dependency = (*KVListQuery)(nil)
// KVListQueryRe is the regular expression to use.
KVListQueryRe = regexp.MustCompile(`\A` + prefixRe + queryRe + dcRe + `\z`)
)
func init() {
gob.Register([]*KeyPair{})
}
// KeyPair is a simple Key-Value pair
type KeyPair struct {
Path string
Key string
Value string
// Lesser-used, but still valuable keys from api.KV
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Flags uint64
Session string
}
// KVListQuery queries the KV store for a single key.
type KVListQuery struct {
stopCh chan struct{}
dc string
prefix string
namespace string
partition string
}
// NewKVListQuery parses a string into a dependency.
func NewKVListQuery(s string) (*KVListQuery, error) {
if s != "" && !KVListQueryRe.MatchString(s) {
return nil, fmt.Errorf("kv.list: invalid format: %q", s)
}
m := regexpMatch(KVListQueryRe, s)
queryParams, err := GetConsulQueryOpts(m, "kv.list")
if err != nil {
return nil, err
}
return &KVListQuery{
stopCh: make(chan struct{}, 1),
dc: m["dc"],
prefix: m["prefix"],
namespace: queryParams.Get(QueryNamespace),
partition: queryParams.Get(QueryPartition),
}, nil
}
// Fetch queries the Consul API defined by the given client.
func (d *KVListQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
select {
case <-d.stopCh:
return nil, nil, ErrStopped
default:
}
opts = opts.Merge(&QueryOptions{
Datacenter: d.dc,
ConsulPartition: d.partition,
ConsulNamespace: d.namespace,
})
log.Printf("[TRACE] %s: GET %s", d, &url.URL{
Path: "/v1/kv/" + d.prefix,
RawQuery: opts.String(),
})
list, qm, err := clients.Consul().KV().List(d.prefix, opts.ToConsulOpts())
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
log.Printf("[TRACE] %s: returned %d pairs", d, len(list))
pairs := make([]*KeyPair, 0, len(list))
for _, pair := range list {
key := strings.TrimPrefix(pair.Key, d.prefix)
key = strings.TrimLeft(key, "/")
pairs = append(pairs, &KeyPair{
Path: pair.Key,
Key: key,
Value: string(pair.Value),
CreateIndex: pair.CreateIndex,
ModifyIndex: pair.ModifyIndex,
LockIndex: pair.LockIndex,
Flags: pair.Flags,
Session: pair.Session,
})
}
rm := &ResponseMetadata{
LastIndex: qm.LastIndex,
LastContact: qm.LastContact,
}
return pairs, rm, nil
}
// CanShare returns a boolean if this dependency is shareable.
func (d *KVListQuery) CanShare() bool {
return true
}
// String returns the human-friendly version of this dependency.
func (d *KVListQuery) String() string {
prefix := d.prefix
if d.dc != "" {
prefix = prefix + "@" + d.dc
}
return fmt.Sprintf("kv.list(%s)", prefix)
}
// Stop halts the dependency's fetch function.
func (d *KVListQuery) Stop() {
close(d.stopCh)
}
// Type returns the type of this dependency.
func (d *KVListQuery) Type() Type {
return TypeConsul
}