-
Notifications
You must be signed in to change notification settings - Fork 23
/
fan_in.go
194 lines (175 loc) · 5.76 KB
/
fan_in.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package fiberapi
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"github.com/gojek/fiber"
fiberHttp "github.com/gojek/fiber/http"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/caraml-dev/turing/engines/router/missionctl/instrumentation"
"github.com/caraml-dev/turing/engines/experiment/runner"
"github.com/caraml-dev/turing/engines/router/missionctl/errors"
"github.com/caraml-dev/turing/engines/router/missionctl/experiment"
"github.com/caraml-dev/turing/engines/router/missionctl/instrumentation/tracing"
"github.com/caraml-dev/turing/engines/router/missionctl/turingctx"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/metrics"
)
// FanInID is used to indendify the fan in component when capturing a request span
const FanInID = "fan_in"
// EnsemblingFanIn combines the results from the fanout with the experiment parameters
// and forwards to the configured ensembling endpoint
type EnsemblingFanIn struct {
*experimentationPolicy
*routeSelectionPolicy
}
// Initialize is invoked by the Fiber library to initialize a new FanIn.
func (fanIn *EnsemblingFanIn) Initialize(properties json.RawMessage) error {
var err error
// Initialize appropriate fields
fanIn.experimentationPolicy, err = newExperimentationPolicy(properties)
if err != nil {
return errors.Wrapf(err, "Failed initializing experimentation policy on FanIn")
}
fanIn.routeSelectionPolicy, err = newRouteSelectionPolicy(properties)
if err != nil {
return errors.Wrapf(err, "Failed initializing route selection policy on FanIn")
}
return nil
}
// Aggregate requests for the treatment parameters from the configured experiment engine,
// collects the results from the fanout, dispatches the combined data to the configured
// ensembling endpoint and returns the result
func (fanIn *EnsemblingFanIn) Aggregate(
ctx context.Context,
req fiber.Request,
queue fiber.ResponseQueue,
) fiber.Response {
// Monitor for the results
respCh := queue.Iter()
// Store the available results and the experiment response
responses := make(map[string]fiber.Response)
var experimentResponse *experiment.Response
// Obtain the experiment response channel from the context, to write the response to
expCtxCh, expCtxChErr := experiment.GetExperimentResponseChannel(ctx)
if expCtxChErr == nil {
defer close(expCtxCh)
}
// Request for the experiment treatment and save it to expRespCh asynchronously
expRespCh := make(chan *experiment.Response, 1)
go func() {
turingReqID, _ := turingctx.GetRequestID(ctx)
options := runner.GetTreatmentOptions{
TuringRequestID: turingReqID,
}
expPlan, expPlanErr := fanIn.experimentEngine.
GetTreatmentForRequest(req.Header(), req.Payload(), options)
// Write to channel
expRespCh <- experiment.NewResponse(expPlan, expPlanErr)
close(expRespCh)
}()
// Wait on the results from the individual routes and experiment engine, for as long as
// timeout is not reached and we don't have all the results.
timeout := false
for (respCh != nil || expRespCh != nil) && !timeout {
select {
case resp, ok := <-respCh:
if ok {
responses[resp.BackendName()] = resp
} else {
// Channel closed, stop reading from respCh
respCh = nil
}
case expResp, ok := <-expRespCh:
if ok {
// Update the experiment response to be returned
experimentResponse = expResp
// Copy experiment response to the experiment result channel in the context
if expCtxChErr == nil {
expCtxCh <- expResp
}
} else {
// Channel closed, stop reading from expRespCh
expRespCh = nil
}
case <-ctx.Done():
timeout = true
}
}
// Associate span to context to trace response ensembling, if tracing enabled
if tracing.Glob().IsEnabled() {
var sp opentracing.Span
sp, _ = tracing.Glob().StartSpanFromContext(ctx, FanInID)
if sp != nil {
defer sp.Finish()
}
}
return fanIn.collectResponses(responses, experimentResponse)
}
// collectResponses collects all responses and treatment in the
// format expected by the ensembler
func (fanIn *EnsemblingFanIn) collectResponses(
responses map[string]fiber.Response,
expResponse *experiment.Response,
) fiber.Response {
result := CombinedResponse{
RouteResponses: make([]RouteResponse, len(responses)),
}
if expResponse != nil {
result.Experiment = *expResponse
}
// Collect all treatment responses
idx := 0
for k, v := range responses {
t := RouteResponse{
Route: k,
Data: v.Payload(),
IsDefault: k == fanIn.defaultRoute,
}
result.RouteResponses[idx] = t
idx++
}
// Marshal the response, measure time
var err error
timer := metrics.Glob().MeasureDurationMs(
instrumentation.TuringComponentRequestDurationMs,
map[string]func() string{
"status": func() string {
return metrics.GetStatusString(err == nil)
},
"component": func() string {
return "fanin_marshalResponse"
},
"traffic_rule": func() string { return "" },
},
)
rBytes, err := jsoniter.Marshal(result)
timer()
if err != nil {
return fiber.NewErrorResponse(err)
}
// Return successful response
resp := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer(rBytes)),
Header: http.Header{
"Content-Type": []string{"application/json"},
},
}
return fiberHttp.NewHTTPResponse(&resp)
}
// RouteResponse captures the result of each experiment
type RouteResponse struct {
Route string `json:"route"`
Data json.RawMessage `json:"data"`
IsDefault bool `json:"is_default"`
}
// CombinedResponse captures the structure of the final response sent back by the fan in
type CombinedResponse struct {
// List of responses from each treatment
RouteResponses []RouteResponse `json:"route_responses"`
// Configuration / Error response from experiment engine
Experiment experiment.Response `json:"experiment"`
}