This repository has been archived by the owner on Jan 8, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 328
/
trigger.go
402 lines (347 loc) · 11.7 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
package httpapi
import (
"encoding/json"
"fmt"
"html"
"net/http"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
empty "google.golang.org/protobuf/types/known/emptypb"
"github.com/hashicorp/waypoint-plugin-sdk/terminal"
"github.com/hashicorp/waypoint/internal/clicontext"
pb "github.com/hashicorp/waypoint/pkg/server/gen"
"github.com/hashicorp/waypoint/pkg/serverclient"
"github.com/hashicorp/waypoint/pkg/serverconfig"
)
// Message is the message we return to the requester when streaming job output
type Message struct {
// The job id that was queued when running the requested trigger
JobId string `json:"jobId,omitempty"`
// Value is the job stream event message to stream back to the requester
Value interface{} `json:"value,omitempty"`
// ValueType is the kind of job stream event
ValueType string `json:"valueType,omitempty"`
// If the job has completed, we return a 0 for success, 1 for failure.
ExitCode string `json:"exitCode,omitempty"`
// Error is set when the job failures for any reason
Error interface{} `json:"error,omitempty"`
}
// HandleTrigger will execute a run trigger, if the requested id exists
// This works by connecting back to our own local gRPC server.
func HandleTrigger(addr string, tls bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := hclog.FromContext(ctx)
log.SetLevel(hclog.Debug)
// Authless trigger URLs should be able to make a request
// without a token.
token := r.URL.Query().Get("token")
requireAuth := true
if token == "" {
log.Trace("no token provided, will attempt to run authless trigger")
requireAuth = false
}
// Connect back to our own gRPC service.
grpcConn, err := serverclient.Connect(ctx,
serverclient.Logger(log),
serverclient.FromContextConfig(&clicontext.Config{
Server: serverconfig.Client{
Address: addr,
RequireAuth: requireAuth,
AuthToken: token,
// Our gRPC server should always be listening on TLS.
// We ignore it because its coming out of our own process.
Tls: tls,
TlsSkipVerify: true,
},
}),
)
if err != nil {
log.Error("trigger connection back to gRPC failed", "err", err)
return
}
defer grpcConn.Close()
// Our API client
client := pb.NewWaypointClient(grpcConn)
requestVars := mux.Vars(r)
runTriggerId := requestVars["id"]
variablesJSONRaw := r.URL.Query().Get("variables")
var (
variables map[string]string
variableOverrides []*pb.Variable
)
if variablesJSONRaw != "" {
if err := json.Unmarshal([]byte(variablesJSONRaw), &variables); err != nil {
http.Error(w,
fmt.Sprintf("failed to decode 'variables' json request param into a map: %s", err),
http.StatusInternalServerError)
return
}
for name, value := range variables {
v := &pb.Variable{
Name: name,
Source: &pb.Variable_Cli{Cli: &empty.Empty{}},
}
if valBool, err := strconv.ParseBool(value); err == nil {
v.Value = &pb.Variable_Bool{Bool: valBool}
} else if valInt, err := strconv.ParseInt(value, 10, 64); err == nil {
v.Value = &pb.Variable_Num{Num: valInt}
} else {
// NOTE: for this case, it can either be a "string" or
// complex HCL type like an array or map. We can set this value
// as a Variable_Str here, and when we go to parse the variables
// later we do the proper string versus HCL check in variables.go
v.Value = &pb.Variable_Str{Str: value}
}
variableOverrides = append(variableOverrides, v)
}
}
var resp *pb.RunTriggerResponse
runTriggerReq := &pb.RunTriggerRequest{
Ref: &pb.Ref_Trigger{
Id: runTriggerId,
},
VariableOverrides: variableOverrides,
}
// attempt to make a grpc request to run trigger by id
if requireAuth {
resp, err = client.RunTrigger(ctx, runTriggerReq)
} else {
resp, err = client.NoAuthRunTrigger(ctx, runTriggerReq)
}
if err != nil {
log.Error("server failed to run trigger", "id", runTriggerId, "err", err)
if status.Code(err) == codes.PermissionDenied {
http.Error(w,
fmt.Sprintf("request not authorized to run trigger: %s", err),
http.StatusUnauthorized)
} else {
// improve http error code, which is more applicable for general queue failures?
http.Error(w,
fmt.Sprintf("server failed to run trigger: %s", err),
http.StatusPreconditionFailed)
}
return
}
if resp == nil {
http.Error(w,
fmt.Sprintf("server returned no job ids from run trigger %q", html.EscapeString(runTriggerId)),
http.StatusInternalServerError)
return
}
triggerJobs := resp.JobIds
streamOutput := r.URL.Query().Get("stream")
log.Trace("jobs for trigger have been queued")
if streamOutput == "" {
// don't stream if not requested
w.WriteHeader(http.StatusNoContent)
return
}
// Attempt to stream output back, on request.
if !requireAuth {
// We do not allow streaming job stream info if a no-auth token trigger was requested
log.Trace("server does not allow for streaming job stream output for no-token trigger URLs")
w.WriteHeader(http.StatusNoContent)
return
}
log.Debug("attempting to stream back queued job output from running trigger")
cn, ok := w.(http.CloseNotifier)
if !ok {
log.Error("failed to stream job output, could not create http.CloseNotifier")
http.Error(w, "server failed to create http CloseNotifier", http.StatusInternalServerError)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Error("failed to stream job output, could not create http.Flusher")
http.Error(w, "server failed to create http.Flusher", http.StatusInternalServerError)
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
enc := json.NewEncoder(w)
var (
wg sync.WaitGroup
mu sync.Mutex
)
log.Trace("starting job stream for jobs", "total_jobs", len(triggerJobs))
wg.Add(len(triggerJobs))
// NOTE(briancain): This loop starts N goroutines concurrently for
// each trigger job to stream back to the requester.
for _, jId := range triggerJobs {
go func(jId string) {
defer wg.Done()
stream, err := client.GetJobStream(ctx, &pb.GetJobStreamRequest{
JobId: jId,
})
if err != nil {
log.Error("server failed to get job stream output for trigger", "job_id", jId, "err", err)
http.Error(w,
fmt.Sprintf("server failed to obtain job stream output: %s", err),
http.StatusInternalServerError)
return
}
log.Trace("reading job stream for job", "job_id", jId)
// Wait for open confirmation
resp, err := stream.Recv()
if err != nil {
log.Error("server failed to stream job output", "err", err)
http.Error(w,
fmt.Sprintf("server failed to receive job stream output: %s", err),
http.StatusInternalServerError)
return
}
if _, ok := resp.Event.(*pb.GetJobStreamResponse_Open_); !ok {
log.Error("server failed to open job stream output, got unexpected message", "event", resp.Event)
http.Error(w,
fmt.Sprintf("job stream failed to open, got unexpected message: %T", resp.Event),
http.StatusInternalServerError)
return
}
var (
jobComplete bool
exitCode string
)
// read and send the stream
for {
select {
case <-cn.CloseNotify():
log.Trace("client closed connection to stream")
return
default:
// Get jobstream output and return Message back
time.Sleep(time.Second)
}
resp, err := stream.Recv()
if err != nil {
http.Error(w,
fmt.Sprintf("server failed to receive job stream output: %s", err),
http.StatusInternalServerError)
return
}
if resp == nil {
// This shouldn't happen, but if it does, just ignore it.
log.Warn("nil response received, ignoring")
continue
}
// the message to craft and return
var (
value interface{}
valueType string
msgErr interface{}
)
// handle events from job stream
switch event := resp.Event.(type) {
case *pb.GetJobStreamResponse_Complete_:
jobComplete = true
valueType = "Complete"
if event.Complete.Error == nil {
log.Info("job completed successfully")
exitCode = "0"
} else {
exitCode = "1"
st := status.FromProto(event.Complete.Error)
log.Warn("job failed", "code", st.Code(), "message", st.Message())
msgErr = st
}
case *pb.GetJobStreamResponse_Error_:
jobComplete = true
exitCode = "1"
valueType = "Error"
st := status.FromProto(event.Error.Error)
log.Warn("job stream failure", "code", st.Code(), "message", st.Message())
msgErr = st
case *pb.GetJobStreamResponse_Terminal_:
// We got some job output! Craft a message to be sent back
for _, ev := range event.Terminal.Events {
log.Trace("job terminal output", "event", ev)
switch ev := ev.Event.(type) {
case *pb.GetJobStreamResponse_Terminal_Event_Line_:
value = ev.Line.Msg
valueType = "TerminalEventLine"
case *pb.GetJobStreamResponse_Terminal_Event_NamedValues_:
var values []terminal.NamedValue
for _, tnv := range ev.NamedValues.Values {
values = append(values, terminal.NamedValue{
Name: tnv.Name,
Value: tnv.Value,
})
}
value = values
valueType = "TerminalEventNamedValues"
case *pb.GetJobStreamResponse_Terminal_Event_Status_:
value = ev.Status.Msg
valueType = "TerminalEventStatus"
case *pb.GetJobStreamResponse_Terminal_Event_Raw_:
value = string(ev.Raw.Data[:])
valueType = "TerminalEventRaw"
case *pb.GetJobStreamResponse_Terminal_Event_Table_:
tbl := terminal.NewTable(ev.Table.Headers...)
for _, row := range ev.Table.Rows {
var trow []terminal.TableEntry
for _, ent := range row.Entries {
trow = append(trow, terminal.TableEntry{
Value: ent.Value,
Color: ent.Color,
})
}
}
value = tbl
valueType = "TerminalEventTable"
case *pb.GetJobStreamResponse_Terminal_Event_Step_:
m := ev.Step.Msg
if len(ev.Step.Output) > 0 {
m = m + "\n" + string(ev.Step.Output[:])
}
value = m
valueType = "TerminalEventStep"
default:
log.Error("Unknown terminal event seen", "type", hclog.Fmt("%T", ev))
}
}
default:
log.Warn("unknown stream event", "event", resp.Event)
}
// Send a message job stream back to the client
if valueType != "" {
log.Trace("sending job data to client for job", "job_id", jId)
// Note that all empty values will be omitted
msg := Message{
JobId: jId,
ExitCode: exitCode,
Value: value,
ValueType: valueType,
Error: msgErr,
}
// Lock to ensure multiple routines don't send back a message at the same
// time to the receiver and mess up the incoming message
mu.Lock()
// send the message back
err := enc.Encode(msg)
if err != nil {
log.Error("failed to encode job stream output to send back", "err", err)
http.Error(w, fmt.Sprintf("server failed to encode job stream output: %s", err), 500)
mu.Unlock()
return
}
flusher.Flush()
mu.Unlock()
}
if jobComplete {
log.Trace("job complete, continuing to next job for streaming", "job_id", jId)
return
}
}
}(jId)
}
wg.Wait()
log.Trace("finished streaming trigger jobs")
}
}