-
Notifications
You must be signed in to change notification settings - Fork 25
/
http-job.go
98 lines (82 loc) · 2.57 KB
/
http-job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package jobs
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// HTTPJob wraps a basic job along with HTTPJobConfig to execute an HTTP job.
type HTTPJob struct {
BasicJob
config HTTPJobConfig
}
// Make sure HTTPJob complies with Job interface.
var _ Job = (*HTTPJob)(nil)
// HTTPJobConfig is the configuration for an HTTP job.
type HTTPJobConfig struct {
Client *http.Client
Body BodyProvider
URL string
Method string
ExpectedBody string
Name string
ExpectedStatus int
}
// BodyProvider allows the users to provide a body to the HTTP jobs. For example for posting a payload as a job.
type BodyProvider func() io.Reader
// NewHTTPJob creates a new HTTPJob.
func NewHTTPJob(config HTTPJobConfig) *HTTPJob {
job := &HTTPJob{}
job.config = config
job.JobName = config.Name
job.JobFunc = func(ctx context.Context) (proto.Message, error) {
resp, err := fetchURL(ctx, config.Method, config.URL, config.Client, config.Body())
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != config.ExpectedStatus {
return nil, errors.Errorf("unexpected status code: %v, expected: %v", resp.StatusCode,
config.ExpectedStatus)
}
if config.ExpectedBody != "" {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read response body: %v", err)
}
if !strings.Contains(string(body), config.ExpectedBody) {
return nil, errors.Errorf("body does not contain expected content '%v'", config.ExpectedBody)
}
}
message := wrapperspb.String(fmt.Sprintf("%s is accessible", config.URL))
return message, nil
}
return job
}
// Name returns the name of the job.
func (job *HTTPJob) Name() string {
return job.BasicJob.Name()
}
// JobWatchers returns the job watchers for the job.
func (job *HTTPJob) JobWatchers() JobWatchers {
return job.BasicJob.JobWatchers()
}
// Execute executes the job.
func (job *HTTPJob) Execute(ctx context.Context) (proto.Message, error) {
return job.BasicJob.Execute(ctx)
}
func fetchURL(ctx context.Context, method string, url string, client *http.Client, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, errors.Errorf("unable to create job http request: %v", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, errors.Errorf("fail to execute %v request: %v", method, err)
}
return resp, nil
}