/
scaling_endpoint.go
204 lines (178 loc) · 6.25 KB
/
scaling_endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package nomad
import (
"strings"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// Scaling endpoint is used for listing and retrieving scaling policies
type Scaling struct {
srv *Server
logger log.Logger
}
// ListPolicies is used to list the policies
func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now())
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return p.listAllNamespaces(args, reply)
}
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies)
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
if !(hasListScalingPolicies || hasListAndReadJobs) {
return structs.ErrPermissionDenied
}
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Iterate over all the policies
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}
if err != nil {
return err
}
// Convert all the policies to a list stub
reply.Policies = nil
for raw := iter.Next(); raw != nil; raw = iter.Next() {
policy := raw.(*structs.ScalingPolicy)
reply.Policies = append(reply.Policies, policy.Stub())
}
// Use the last index that affected the policy table
index, err := state.Index("scaling_policy")
if err != nil {
return err
}
// Don't return index zero, otherwise a blocking query cannot be used.
if index == 0 {
index = 1
}
reply.Index = index
return nil
}}
return p.srv.blockingRPC(&opts)
}
// GetPolicy is used to get a specific policy
func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
reply *structs.SingleScalingPolicyResponse) error {
if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now())
// Check for list-job permissions
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy)
hasListAndReadJobs := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) &&
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
if !(hasReadScalingPolicy || hasListAndReadJobs) {
return structs.ErrPermissionDenied
}
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Iterate over all the policies
p, err := state.ScalingPolicyByID(ws, args.ID)
if err != nil {
return err
}
reply.Policy = p
// Use the last index that affected the policy table
index, err := state.Index("scaling_policy")
if err != nil {
return err
}
// Ensure we never set the index to zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
if index == 0 {
index = 1
}
reply.Index = index
return nil
}}
return p.srv.blockingRPC(&opts)
}
func (j *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
// Check for list-job permissions
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) ||
(aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob))
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty if token isn't authorized for any namespace
reply.Policies = []*structs.ScalingPolicyListStub{}
return nil
} else if err != nil {
return err
}
// Capture all the policies
var iter memdb.ResultIterator
if args.Type != "" {
iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type)
} else {
iter, err = state.ScalingPolicies(ws)
}
if err != nil {
return err
}
var policies []*structs.ScalingPolicyListStub
for raw := iter.Next(); raw != nil; raw = iter.Next() {
policy := raw.(*structs.ScalingPolicy)
if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] {
// not permitted to this name namespace
continue
}
if prefix != "" && !strings.HasPrefix(policy.ID, prefix) {
continue
}
policies = append(policies, policy.Stub())
}
reply.Policies = policies
// Use the last index that affected the policies table or summary
index, err := state.Index("scaling_policy")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(1, index)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}