/
task.http.go
176 lines (152 loc) · 4.69 KB
/
task.http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package pipeline
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/SeerLink/seerlink/core/logger"
"github.com/SeerLink/seerlink/core/store/models"
"github.com/SeerLink/seerlink/core/utils"
)
type (
MaybeBool string
)
const (
MaybeBoolTrue = MaybeBool("true")
MaybeBoolFalse = MaybeBool("false")
MaybeBoolNull = MaybeBool("")
)
func MaybeBoolFromString(s string) (MaybeBool, error) {
switch s {
case "true":
return MaybeBoolTrue, nil
case "false":
return MaybeBoolFalse, nil
case "":
return MaybeBoolNull, nil
default:
return "", errors.Errorf("unknown value for bool: %s", s)
}
}
func (m MaybeBool) Bool() (b bool, isSet bool) {
switch m {
case MaybeBoolTrue:
return true, true
case MaybeBoolFalse:
return false, true
default:
return false, false
}
}
type HTTPTask struct {
BaseTask `mapstructure:",squash"`
Method string
URL models.WebURL
RequestData HttpRequestData `json:"requestData"`
AllowUnrestrictedNetworkAccess MaybeBool
config Config
}
type PossibleErrorResponses struct {
Error string `json:"error"`
ErrorMessage string `json:"errorMessage"`
}
var _ Task = (*HTTPTask)(nil)
var (
promHTTPFetchTime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pipeline_task_http_fetch_time",
Help: "Time taken to fully execute the HTTP request",
},
[]string{"pipeline_task_spec_id"},
)
promHTTPResponseBodySize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pipeline_task_http_response_body_size",
Help: "Size (in bytes) of the HTTP response body",
},
[]string{"pipeline_task_spec_id"},
)
)
func (t *HTTPTask) Type() TaskType {
return TaskTypeHTTP
}
func (t *HTTPTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error {
return nil
}
func (t *HTTPTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result {
if len(inputs) > 0 {
return Result{Error: errors.Wrapf(ErrWrongInputCardinality, "HTTPTask requires 0 inputs")}
}
var bodyReader io.Reader
if t.RequestData != nil {
bodyBytes, err := json.Marshal(t.RequestData)
if err != nil {
return Result{Error: errors.Wrap(err, "failed to encode request body as JSON")}
}
bodyReader = bytes.NewReader(bodyBytes)
}
request, err := http.NewRequest(t.Method, t.URL.String(), bodyReader)
if err != nil {
return Result{Error: errors.Wrap(err, "failed to create http.Request")}
}
request.Header.Set("Content-Type", "application/json")
config := utils.HTTPRequestConfig{
Timeout: t.config.DefaultHTTPTimeout().Duration(),
MaxAttempts: t.config.DefaultMaxHTTPAttempts(),
SizeLimit: t.config.DefaultHTTPLimit(),
AllowUnrestrictedNetworkAccess: t.allowUnrestrictedNetworkAccess(),
}
httpRequest := utils.HTTPRequest{
Request: request,
Config: config,
}
start := time.Now()
responseBytes, statusCode, err := httpRequest.SendRequest(ctx)
if err != nil {
if ctx.Err() != nil {
return Result{Error: errors.New("http request timed out or interrupted")}
}
return Result{Error: errors.Wrapf(err, "error making http request")}
}
elapsed := time.Since(start)
promHTTPFetchTime.WithLabelValues(fmt.Sprintf("%d", taskRun.PipelineTaskSpecID)).Set(float64(elapsed))
promHTTPResponseBodySize.WithLabelValues(fmt.Sprintf("%d", taskRun.PipelineTaskSpecID)).Set(float64(len(responseBytes)))
if statusCode >= 400 {
maybeErr := bestEffortExtractError(responseBytes)
return Result{Error: errors.Errorf("got error from %s: (status code %v) %s", t.URL.String(), statusCode, maybeErr)}
}
logger.Debugw("HTTP task got response",
"response", string(responseBytes),
"url", t.URL.String(),
"pipelineTaskSpecID", taskRun.PipelineTaskSpecID,
)
// NOTE: We always stringify the response since this is required for all current jobs.
// If a binary response is required we might consider adding an adapter
// flag such as "BinaryMode: true" which passes through raw binary as the
// value instead.
return Result{Value: string(responseBytes)}
}
func (t *HTTPTask) allowUnrestrictedNetworkAccess() bool {
b, isSet := t.AllowUnrestrictedNetworkAccess.Bool()
if isSet {
return b
}
return t.config.DefaultHTTPAllowUnrestrictedNetworkAccess()
}
func bestEffortExtractError(responseBytes []byte) string {
var resp PossibleErrorResponses
err := json.Unmarshal(responseBytes, &resp)
if err != nil {
return ""
}
if resp.Error != "" {
return resp.Error
} else if resp.ErrorMessage != "" {
return resp.ErrorMessage
}
return string(responseBytes)
}