-
Notifications
You must be signed in to change notification settings - Fork 8
/
executor.go
102 lines (81 loc) · 3.27 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package runner
import (
"context"
"sync"
"time"
"github.com/b2wdigital/restQL-golang/v4/internal/domain"
"github.com/b2wdigital/restQL-golang/v4/pkg/restql"
)
// Executor process statements into a result
// by executing the relevant HTTP calls to
// the upstream dependency.
type Executor struct {
client domain.HTTPClient
log restql.Logger
resourceTimeout time.Duration
forwardPrefix string
}
// NewExecutor constructs an instance of Executor.
func NewExecutor(log restql.Logger, client domain.HTTPClient, resourceTimeout time.Duration, forwardPrefix string) Executor {
return Executor{client: client, log: log, resourceTimeout: resourceTimeout, forwardPrefix: forwardPrefix}
}
// DoStatement process a single statement into a result by executing the relevant HTTP calls to the upstream dependency.
func (e Executor) DoStatement(ctx context.Context, statement domain.Statement, queryCtx restql.QueryContext) restql.DoneResource {
log := restql.GetLogger(ctx)
drOptions := DoneResourceOptions{
IgnoreErrors: statement.IgnoreErrors,
MaxAge: statement.CacheControl.MaxAge,
SMaxAge: statement.CacheControl.SMaxAge,
}
emptyChainedParams := GetEmptyChainedParams(statement)
if len(emptyChainedParams) > 0 {
emptyChainedResponse := NewEmptyChainedResponse(log, emptyChainedParams, drOptions)
log.Debug("request execution skipped due to empty chained parameters", "resource", statement.Resource, "method", statement.Method)
return emptyChainedResponse
}
request := MakeRequest(e.resourceTimeout, e.forwardPrefix, statement, queryCtx)
log.Debug("executing request for statement", "resource", statement.Resource, "method", statement.Method, "request", request)
response, err := e.client.Do(ctx, request)
if err != nil {
errorResponse := NewErrorResponse(log, err, request, response, drOptions)
log.Debug("request execution failed", "error", err, "resource", statement.Resource, "method", statement.Method, "response", errorResponse)
return errorResponse
}
dr := NewDoneResource(request, response, drOptions)
log.Debug("request execution done", "resource", statement.Resource, "method", statement.Method, "response", dr)
return dr
}
// DoMultiplexedStatement process multiplexed statements into a result by executing the relevant HTTP calls to the upstream dependency.
func (e Executor) DoMultiplexedStatement(ctx context.Context, statements []interface{}, queryCtx restql.QueryContext) restql.DoneResources {
responseChans := make([]chan interface{}, len(statements))
for i := range responseChans {
responseChans[i] = make(chan interface{}, 1)
}
var wg sync.WaitGroup
wg.Add(len(statements))
for i, stmt := range statements {
i, stmt := i, stmt
ch := responseChans[i]
go func() {
response := e.doCurrentStatement(ctx, stmt, queryCtx)
ch <- response
wg.Done()
}()
}
wg.Wait()
responses := make(restql.DoneResources, len(statements))
for i, ch := range responseChans {
responses[i] = <-ch
}
return responses
}
func (e Executor) doCurrentStatement(ctx context.Context, stmt interface{}, queryCtx restql.QueryContext) interface{} {
switch stmt := stmt.(type) {
case domain.Statement:
return e.DoStatement(ctx, stmt, queryCtx)
case []interface{}:
return e.DoMultiplexedStatement(ctx, stmt, queryCtx)
default:
return nil
}
}