Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat Httpjson input: Fix issue 17734 to retry on rate-limit error #17735

Merged
merged 3 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 70 additions & 15 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -25,8 +27,9 @@ import (
)

var (
once sync.Once
url string
once sync.Once
url string
isRetry bool
)

func testSetup(t *testing.T) {
Expand All @@ -36,14 +39,8 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -72,6 +69,43 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input
w.Write(b)
}
}))
}

func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
w.Header().Set("Content-Type", "application/json")
if !isRetry {
w.Header().Set("X-Rate-Limit-Limit", "0")
w.Header().Set("X-Rate-Limit-Remaining", "0")
w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10))
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte{})
isRetry = true
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
}
}))
}

func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := createServer(newServer)
if testRateLimitRetry {
ts = createCustomServer(newServer)
}
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
Expand Down Expand Up @@ -337,7 +371,7 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -359,7 +393,28 @@ func TestGetHTTPS(t *testing.T) {
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRateLimitRetry(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -381,7 +436,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -403,7 +458,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -424,7 +479,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
if msg.StatusCode == http.StatusTooManyRequests {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
continue
}
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
Expand Down