Skip to content

Commit

Permalink
Filebeat Httpjson input: Fix issue 17734 to retry on rate-limit error (
Browse files Browse the repository at this point in the history
…#17735)

* Fix issue 17734 to add retry for rate-limit error.

* Add test case for rate-limit error retry.

* Make isRetry a local variable in createCustomServer function.
  • Loading branch information
Lei Qiu committed Apr 20, 2020
1 parent a16a2a3 commit bcbabdb
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `elasticsearch.audit` data ingest pipeline to be more forgiving with date formats found in Elasticsearch audit logs. {pull}17406[17406]
- Fixed activemq module causing "regular expression has redundant nested repeat operator" warning in Elasticsearch. {pull}17428[17428]
- Remove migrationVersion map 7.7.0 reference from Kibana dashboard file to fix backward compatibility issues. {pull}17425[17425]
- Fix issue 17734 to retry on rate-limit error in the Filebeat httpjson input. {issue}17734[17734] {pull}17735[17735]

*Heartbeat*

Expand Down
81 changes: 68 additions & 13 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -36,14 +38,8 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -72,6 +68,44 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input
w.Write(b)
}
}))
}

func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
var isRetry bool
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if !isRetry {
w.Header().Set("X-Rate-Limit-Limit", "0")
w.Header().Set("X-Rate-Limit-Remaining", "0")
w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10))
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte{})
isRetry = true
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
}
}))
}

func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := createServer(newServer)
if testRateLimitRetry {
ts = createCustomServer(newServer)
}
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
Expand Down Expand Up @@ -337,7 +371,7 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -359,7 +393,28 @@ func TestGetHTTPS(t *testing.T) {
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRateLimitRetry(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -381,7 +436,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -403,7 +458,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -424,7 +479,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
if msg.StatusCode == http.StatusTooManyRequests {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
continue
}
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
Expand Down

0 comments on commit bcbabdb

Please sign in to comment.