-
Notifications
You must be signed in to change notification settings - Fork 25
/
http-request-preview.go
109 lines (93 loc) · 3.12 KB
/
http-request-preview.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package preview
import (
"context"
"encoding/json"
"sync"
"github.com/google/uuid"
"google.golang.org/protobuf/types/known/structpb"
flowpreviewv1 "github.com/fluxninja/aperture/v2/api/gen/proto/go/aperture/flowcontrol/preview/v1"
policylangv1 "github.com/fluxninja/aperture/v2/api/gen/proto/go/aperture/policy/language/v1"
"github.com/fluxninja/aperture/v2/pkg/log"
"github.com/fluxninja/aperture/v2/pkg/policies/flowcontrol/iface"
)
// HTTPRequestsPreviewRequest holds the samples while the preview is being generated.
type HTTPRequestsPreviewRequest struct {
mutex sync.Mutex
selectors []*policylangv1.Selector
previewResponse *flowpreviewv1.PreviewHTTPRequestsResponse
previewDoneCtx context.Context
previewDone context.CancelFunc
previewID iface.PreviewID
samples int64
}
// GetPreviewID returns the preview ID.
func (r *HTTPRequestsPreviewRequest) GetPreviewID() iface.PreviewID {
return r.previewID
}
// GetSelectors returns the flow selector.
func (r *HTTPRequestsPreviewRequest) GetSelectors() []*policylangv1.Selector {
return r.selectors
}
// AddHTTPRequestPreview adds a HTTP request preview to the response.
func (r *HTTPRequestsPreviewRequest) AddHTTPRequestPreview(request map[string]interface{}) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.samples > 0 {
// some fields in request are json.Number and incompatible with structpb
// so we convert them to float64 by encoding and decoding
b, err := json.Marshal(request)
if err != nil {
log.Errorf("failed to marshal request: %v", err)
return
}
var m map[string]interface{}
if err = json.Unmarshal(b, &m); err != nil {
log.Errorf("failed to unmarshal request: %v", err)
return
}
// encode as request as structpb
structProto, err := structpb.NewStruct(m)
if err != nil {
log.Errorf("failed to encode HTTP request as structpb: %v", err)
return
}
r.previewResponse.Samples = append(r.previewResponse.Samples,
structProto)
r.samples--
if r.samples == 0 {
r.previewDone()
}
}
}
// PreviewHTTPRequests implements flowpreview.v1.PreviewHTTPRequests.
func (h *Handler) PreviewHTTPRequests(ctx context.Context, req *flowpreviewv1.PreviewRequest) (*flowpreviewv1.PreviewHTTPRequestsResponse, error) {
// generate a unique ID for the preview request
previewID := iface.PreviewID{
RequestID: uuid.New().String(),
}
selectors := []*policylangv1.Selector{
{
ControlPoint: req.ControlPoint,
LabelMatcher: req.LabelMatcher,
AgentGroup: h.agentGroup,
Service: req.Service,
},
}
// create a new request object
hr := &HTTPRequestsPreviewRequest{
previewID: previewID,
selectors: selectors,
previewResponse: &flowpreviewv1.PreviewHTTPRequestsResponse{},
samples: req.Samples,
}
// make a context that is canceled when the preview is done
hr.previewDoneCtx, hr.previewDone = context.WithCancel(ctx)
defer hr.previewDone()
// add the request to the classifier
h.classifier.AddPreview(hr)
// wait for the preview to complete
<-hr.previewDoneCtx.Done()
// remove the request from the classifier
h.classifier.DropPreview(hr)
return hr.previewResponse, nil
}