/
worker.go
253 lines (226 loc) · 7.59 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package servers
import (
"context"
"strings"
"github.com/fatih/structs"
"github.com/hashicorp/boundary/internal/db/timestamp"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/servers/store"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)
type WorkerType string
const (
UnknownWorkerType WorkerType = "unknown"
KmsWorkerType WorkerType = "kms"
PkiWorkerType WorkerType = "pki"
)
func (t WorkerType) Valid() bool {
switch t {
case KmsWorkerType, PkiWorkerType:
return true
}
return false
}
func (t WorkerType) String() string {
return string(t)
}
type workerAuthWorkerId struct {
WorkerId string `mapstructure:"worker_id"`
}
// AttachWorkerIdToState accepts a workerId and creates a struct for use with the Nodeenrollment lib
// This is intended for use in worker authorization; AuthorizeNode in the lib accepts the option WithState
// so that the workerId is passed through to storage and associated with a WorkerAuth record
func AttachWorkerIdToState(ctx context.Context, workerId string) (*structpb.Struct, error) {
const op = "servers.(Worker).AttachWorkerIdToState"
if workerId == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing workerId")
}
workerMap := &workerAuthWorkerId{WorkerId: workerId}
s := structs.New(workerMap)
s.TagName = "mapstructure"
return structpb.NewStruct(s.Map())
}
// A Worker is a server that provides an address which can be used to proxy
// session connections. It can be tagged with custom tags and is used when
// authorizing and establishing a session. It is owned by a scope.
type Worker struct {
*store.Worker
activeConnectionCount uint32 `gorm:"-"`
apiTags []*Tag `gorm:"-"`
configTags []*Tag `gorm:"-"`
// inputTags is not specified to be api or config tags and is not intended
// to be read by clients. Since config tags and api tags are applied in
// mutually exclusive contexts, inputTags is interpreted to be one or the
// other based on the context in which the worker is passed. As such
// inputTags should only be read when performing mutations on the database.
inputTags []*Tag `gorm:"-"`
}
// NewWorker returns a new Worker. Valid options are WithName, WithDescription
// WithAddress, and WithWorkerTags. All other options are ignored. This does
// not set any of the worker reported values.
func NewWorker(scopeId string, opt ...Option) *Worker {
opts := getOpts(opt...)
return &Worker{
Worker: &store.Worker{
ScopeId: scopeId,
Name: opts.withName,
Description: opts.withDescription,
Address: opts.withAddress,
},
inputTags: opts.withWorkerTags,
}
}
// allocWorker will allocate a Worker
func allocWorker() Worker {
return Worker{Worker: &store.Worker{}}
}
func (w *Worker) clone() *Worker {
if w == nil {
return nil
}
cw := proto.Clone(w.Worker)
cWorker := &Worker{
Worker: cw.(*store.Worker),
}
if w.apiTags != nil {
cWorker.apiTags = make([]*Tag, 0, len(w.apiTags))
for _, t := range w.apiTags {
cWorker.apiTags = append(cWorker.apiTags, &Tag{Key: t.Key, Value: t.Value})
}
}
if w.configTags != nil {
cWorker.configTags = make([]*Tag, 0, len(w.configTags))
for _, t := range w.configTags {
cWorker.configTags = append(cWorker.configTags, &Tag{Key: t.Key, Value: t.Value})
}
}
if w.inputTags != nil {
cWorker.inputTags = make([]*Tag, 0, len(w.inputTags))
for _, t := range w.inputTags {
cWorker.inputTags = append(cWorker.inputTags, &Tag{Key: t.Key, Value: t.Value})
}
}
return cWorker
}
// ActiveConnectionCount is the current number of sessions this worker is handling
// according to the controllers.
func (w *Worker) ActiveConnectionCount() uint32 {
return w.activeConnectionCount
}
// CanonicalTags is the deduplicated set of tags contained on both the resource
// set over the API as well as the tags reported by the worker itself.
func (w *Worker) CanonicalTags() map[string][]string {
dedupedTags := make(map[Tag]struct{})
for _, t := range w.apiTags {
dedupedTags[*t] = struct{}{}
}
for _, t := range w.configTags {
dedupedTags[*t] = struct{}{}
}
tags := make(map[string][]string)
for t := range dedupedTags {
tags[t.Key] = append(tags[t.Key], t.Value)
}
return tags
}
// GetConfigTags returns the tags for this worker which has been set through
// the worker daemon's configuration file.
func (w *Worker) GetConfigTags() map[string][]string {
tags := make(map[string][]string)
for _, t := range w.configTags {
tags[t.Key] = append(tags[t.Key], t.Value)
}
return tags
}
// GetLastStatusTime contains the last time the worker has reported to the
// controller its connection status. If the worker has never reported to a
// controller then nil is returned.
func (w *Worker) GetLastStatusTime() *timestamp.Timestamp {
if w == nil || w.Worker == nil || w.Worker.GetLastStatusTime().AsTime() == timestamp.NegativeInfinityTS {
return nil
}
return w.Worker.GetLastStatusTime()
}
// TableName overrides the table name used by Worker to `server_worker`
func (Worker) TableName() string {
return "server_worker"
}
// workerAggregate contains an aggregated view of the values associated with
// a single worker.
type workerAggregate struct {
PublicId string `gorm:"primary_key"`
ScopeId string
Name string
Description string
CreateTime *timestamp.Timestamp
UpdateTime *timestamp.Timestamp
Address string
Version uint32
Type string
ApiTags string
ActiveConnectionCount uint32
// Config Fields
LastStatusTime *timestamp.Timestamp
WorkerConfigTags string
}
func (a *workerAggregate) toWorker(ctx context.Context) (*Worker, error) {
const op = "servers.(workerAggregate).toWorker"
worker := &Worker{
Worker: &store.Worker{
PublicId: a.PublicId,
Name: a.Name,
Description: a.Description,
Address: a.Address,
CreateTime: a.CreateTime,
UpdateTime: a.UpdateTime,
ScopeId: a.ScopeId,
Version: a.Version,
LastStatusTime: a.LastStatusTime,
Type: a.Type,
},
activeConnectionCount: a.ActiveConnectionCount,
}
tags, err := tagsFromAggregatedTagString(ctx, a.ApiTags)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error parsing config tag string"))
}
worker.apiTags = tags
tags, err = tagsFromAggregatedTagString(ctx, a.WorkerConfigTags)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error parsing config tag string"))
}
worker.configTags = tags
return worker, nil
}
// tagsForAggregatedTagString parses a deliminated string in the format returned
// by the database for the server_worker_aggregate view and returns []*Tag.
// The string is in the format of key1Yvalue1Zkey2Yvalue2Zkey3Yvalue3. Y and Z
// ares chosen for deliminators since tag keys and values are restricted from
// having capitalized letters in them.
func tagsFromAggregatedTagString(ctx context.Context, s string) ([]*Tag, error) {
if s == "" {
return nil, nil
}
const op = "servers.tagsFromAggregatedTagString"
const aggregateDelimiter = "Z"
const pairDelimiter = "Y"
var tags []*Tag
for _, kv := range strings.Split(s, aggregateDelimiter) {
res := strings.SplitN(kv, pairDelimiter, 3)
if len(res) != 2 {
return nil, errors.New(ctx, errors.Internal, op, "invalid aggregated tag pairs")
}
tags = append(tags, &Tag{
Key: res[0],
Value: res[1],
})
}
return tags, nil
}
func (a *workerAggregate) GetPublicId() string {
return a.PublicId
}
func (workerAggregate) TableName() string {
return "server_worker_aggregate"
}