Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
[HTTP Probe] Add support for running multiple HTTP requests in parallel.
Browse files Browse the repository at this point in the history
Github issue: #319

PiperOrigin-RevId: 281802925
  • Loading branch information
manugarg committed Nov 21, 2019
1 parent c7d43b3 commit 5d8563a
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 82 deletions.
52 changes: 25 additions & 27 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Probe struct {
// book-keeping params
targets []endpoint.Endpoint
httpRequests map[string]*http.Request
results map[string]*result
results map[string]*probeResult
protocol string
method string
url string
Expand All @@ -72,7 +72,7 @@ type Probe struct {
statsExportFrequency int64
}

type result struct {
type probeResult struct {
total, success, timeouts int64
latency metrics.Value
respCodes *metrics.Map
Expand Down Expand Up @@ -101,18 +101,13 @@ func (p *Probe) Init(name string, opts *options.Options) error {
return fmt.Errorf("Invalid Relative URL: %s, must begin with '/'", p.url)
}

if p.c.GetRequestsPerProbe() != 1 {
p.l.Warningf("requests_per_probe field is now deprecated and will be removed in future releases.")
}

// Create a transport for our use. This is mostly based on
// http.DefaultTransport with some timeouts changed.
// TODO(manugarg): Considering cloning DefaultTransport once
// https://github.com/golang/go/issues/26013 is fixed.
dialer := &net.Dialer{
Timeout: p.opts.Timeout,
KeepAlive: 30 * time.Second, // TCP keep-alive
DualStack: true,
}

if p.opts.SourceIP != nil {
Expand Down Expand Up @@ -182,13 +177,18 @@ func isClientTimeout(err error) bool {
}

// httpRequest executes an HTTP request and updates the provided result struct.
func (p *Probe) doHTTPRequest(req *http.Request, result *result) {
func (p *Probe) doHTTPRequest(req *http.Request, result *probeResult, resultMu *sync.Mutex) {
start := time.Now()
result.total++

resp, err := p.client.Do(req)
latency := time.Since(start)

// Note that we take lock on result object outside of the actual request.
resultMu.Lock()
defer resultMu.Unlock()

result.total++

if err != nil {
if isClientTimeout(err) {
p.l.Warning("Target:", req.Host, ", URL:", req.URL.String(), ", http.doHTTPRequest: timeout error: ", err.Error())
Expand Down Expand Up @@ -237,7 +237,7 @@ func (p *Probe) updateTargets() {
}

if p.results == nil {
p.results = make(map[string]*result, len(p.targets))
p.results = make(map[string]*probeResult, len(p.targets))
}

for _, target := range p.targets {
Expand All @@ -259,7 +259,7 @@ func (p *Probe) updateTargets() {
} else {
latencyValue = metrics.NewFloat(0)
}
p.results[target.Name] = &result{
p.results[target.Name] = &probeResult{
latency: latencyValue,
respCodes: metrics.NewMap("code", metrics.NewInt(0)),
respBodies: metrics.NewMap("resp", metrics.NewInt(0)),
Expand All @@ -275,28 +275,26 @@ func (p *Probe) runProbe(ctx context.Context) {

wg := sync.WaitGroup{}
for _, target := range p.targets {
req := p.httpRequests[target.Name]
req, result := p.httpRequests[target.Name], p.results[target.Name]
if req == nil {
continue
}

wg.Add(1)
// We launch a separate goroutine for each HTTP request. Since there can be
// multiple requests per probe per target, we use a mutex to protect access
// to per-target result object in doHTTPRequest. Note that result object is
// not accessed concurrently anywhere else -- export of the metrics happens
// when probe is not running.
var resultMu sync.Mutex

// Launch a separate goroutine for each target.
go func(target string, req *http.Request) {
defer wg.Done()
numRequests := int32(0)
for {
p.doHTTPRequest(req.WithContext(reqCtx), p.results[target])
for numReq := int32(0); numReq < p.c.GetRequestsPerProbe(); numReq++ {
wg.Add(1)

numRequests++
if numRequests >= p.c.GetRequestsPerProbe() {
break
}
// Sleep for requests_interval_msec before continuing.
time.Sleep(time.Duration(p.c.GetRequestsIntervalMsec()) * time.Millisecond)
}
}(target.Name, req)
go func(req *http.Request, result *probeResult) {
defer wg.Done()
p.doHTTPRequest(req.WithContext(reqCtx), result, &resultMu)
}(req, result)
}
}

// Wait until all probes are done.
Expand Down
138 changes: 102 additions & 36 deletions probes/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package http
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -32,7 +35,9 @@ import (

// The Transport is mocked instead of the Client because Client is not an
// interface, but RoundTripper (which Transport implements) is.
type testTransport struct{}
type testTransport struct {
noBody io.ReadCloser
}

func newTestTransport() *testTransport {
return &testTransport{}
Expand All @@ -46,11 +51,20 @@ type testReadCloser struct {
func (trc *testReadCloser) Read(p []byte) (n int, err error) {
return trc.b.Read(p)
}

func (trc *testReadCloser) Close() error {
return nil
}

func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if req.URL.Host == "fail-test.com" {
return nil, errors.New("failing for fail-target.com")
}

if req.Body == nil {
return &http.Response{Body: http.NoBody}, nil
}

b, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
Expand All @@ -66,7 +80,7 @@ func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {

func (tt *testTransport) CancelRequest(req *http.Request) {}

func testProbe(opts *options.Options) ([]*result, error) {
func testProbe(opts *options.Options) ([]*probeResult, error) {
p := &Probe{}
err := p.Init("http_test", opts)
if err != nil {
Expand All @@ -76,23 +90,16 @@ func testProbe(opts *options.Options) ([]*result, error) {

p.runProbe(context.Background())

var results []*result
var results []*probeResult
for _, target := range p.targets {
results = append(results, p.results[target.Name])
}
return results, nil
}

func TestRun(t *testing.T) {
methods := []configpb.ProbeConf_Method{
configpb.ProbeConf_GET,
configpb.ProbeConf_POST,
configpb.ProbeConf_PUT,
configpb.ProbeConf_HEAD,
configpb.ProbeConf_DELETE,
configpb.ProbeConf_PATCH,
configpb.ProbeConf_OPTIONS,
100, // Should default to configpb.ProbeConf_GET
func TestProbeVariousMethods(t *testing.T) {
mpb := func(s string) *configpb.ProbeConf_Method {
return configpb.ProbeConf_Method(configpb.ProbeConf_Method_value[s]).Enum()
}

testBody := "Test HTTP Body"
Expand All @@ -105,39 +112,43 @@ func TestRun(t *testing.T) {
{&configpb.ProbeConf{}, "total: 1, success: 1"},
{&configpb.ProbeConf{Protocol: configpb.ProbeConf_HTTPS.Enum()}, "total: 1, success: 1"},
{&configpb.ProbeConf{RequestsPerProbe: proto.Int32(1)}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[0]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[1]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[1], Body: &testBody}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[2]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[2], Body: &testBody}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[3]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[4]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[5]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[6]}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: &methods[7]}, "total: 1, success: 1"},
{&configpb.ProbeConf{RequestsPerProbe: proto.Int32(4)}, "total: 4, success: 4"},
{&configpb.ProbeConf{Method: mpb("GET")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("POST")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("POST"), Body: &testBody}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("PUT")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("PUT"), Body: &testBody}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("HEAD")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("DELETE")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("PATCH")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Method: mpb("OPTIONS")}, "total: 1, success: 1"},
{&configpb.ProbeConf{Headers: []*configpb.ProbeConf_Header{{Name: &testHeaderName, Value: &testHeaderValue}}}, "total: 1, success: 1"},
}

for _, test := range tests {
opts := &options.Options{
Targets: targets.StaticTargets("test.com"),
Interval: 2 * time.Second,
Timeout: time.Second,
ProbeConf: test.input,
}
results, err := testProbe(opts)
if err != nil {
if fmt.Sprintf("error: '%s'", err.Error()) != test.want {
t.Errorf("Unexpected initialization error: %v", err)
for i, test := range tests {
t.Run(fmt.Sprintf("Test_case(%d)_config(%v)", i, test.input), func(t *testing.T) {
opts := &options.Options{
Targets: targets.StaticTargets("test.com"),
Interval: 2 * time.Second,
Timeout: time.Second,
ProbeConf: test.input,
}

results, err := testProbe(opts)
if err != nil {
if fmt.Sprintf("error: '%s'", err.Error()) != test.want {
t.Errorf("Unexpected initialization error: %v", err)
}
return
}
} else {

for _, result := range results {
got := fmt.Sprintf("total: %d, success: %d", result.total, result.success)
if got != test.want {
t.Errorf("Mismatch got '%s', want '%s'", got, test.want)
}
}
}
})
}
}

Expand Down Expand Up @@ -181,3 +192,58 @@ func TestProbeWithBody(t *testing.T) {
t.Errorf("response map: got=%s, expected=%s", got, expected)
}
}

func TestMultipleTargetsMultipleRequests(t *testing.T) {
testTargets := []string{"test.com", "fail-test.com"}
reqPerProbe := int64(6)
opts := &options.Options{
Targets: targets.StaticTargets(strings.Join(testTargets, ",")),
Interval: 10 * time.Millisecond,
ProbeConf: &configpb.ProbeConf{RequestsPerProbe: proto.Int32(int32(reqPerProbe))},
}

p := &Probe{}
err := p.Init("http_test", opts)
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
p.client.Transport = newTestTransport()

// Verify that Init() created result struct for each target.
for _, tgt := range testTargets {
if _, ok := p.results[tgt]; !ok {
t.Errorf("didn't find results for the target: %s", tgt)
}
}

p.runProbe(context.Background())

wantSuccess := map[string]int64{
"test.com": reqPerProbe,
"fail-test.com": 0, // Test transport is configured to fail this.
}

for _, tgt := range testTargets {
if p.results[tgt].total != reqPerProbe {
t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe)
}
if p.results[tgt].success != wantSuccess[tgt] {
t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt])
}
}

// Run again
p.runProbe(context.Background())

wantSuccess["test.com"] += reqPerProbe

for _, tgt := range testTargets {
if p.results[tgt].total != 2*reqPerProbe {
t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe)
}
if p.results[tgt].success != wantSuccess[tgt] {
t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt])
}
}
}
7 changes: 4 additions & 3 deletions probes/http/proto/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions probes/http/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ message ProbeConf {
// presented by the server for any host name will be accepted.
optional bool disable_cert_validation = 14;

// Requests per probe (Deprecated).
// NOTE: This field is now deprecated and will be removed after the v0.10.3
// releases.
// Requests per probe.
// Number of HTTP requests per probe. Requests are executed concurrently and
// each HTTP re contributes to probe results. For example, if you run two
// requests per probe, "total" counter will be incremented by 2.
optional int32 requests_per_probe = 98 [default = 1];

// How long to wait between two requests to the same target
// NOTE: This field is now deprecated and will be removed after the v0.10.3
// releases.
Expand Down
Loading

0 comments on commit 5d8563a

Please sign in to comment.