Skip to content

Commit

Permalink
Cherry-pick #17735 to 7.7: Fix issue 17734 to retry on rate-limit err…
Browse files Browse the repository at this point in the history
…or in the Filebeat httpjson input. (#17839)

* Filebeat Httpjson input: Fix issue 17734 to retry on rate-limit error (#17735)

* Fix issue 17734 to add retry for rate-limit error.

* Add test case for rate-limit error retry.

* Make isRetry a local variable in createCustomServer function.

(cherry picked from commit bcbabdb)

* Fix CHANGELOG.next.asciidoc
  • Loading branch information
Lei Qiu committed Apr 20, 2020
1 parent 81ce564 commit 9f9f95a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix default index pattern in IBM MQ filebeat dashboard. {pull}17146[17146]
- Fix `elasticsearch.gc` fileset to not collect _all_ logs when Elasticsearch is running in Docker. {issue}13164[13164] {issue}16583[16583] {pull}17164[17164]
- Fixed a mapping exception when ingesting CEF logs that used the spriv or dpriv extensions. {issue}17216[17216] {pull}17220[17220]
- Fix issue 17734 to retry on rate-limit error in the Filebeat httpjson input. {issue}17734[17734] {pull}17735[17735]

*Heartbeat*

Expand Down
81 changes: 68 additions & 13 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -36,14 +38,8 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -72,6 +68,44 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input
w.Write(b)
}
}))
}

func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
var isRetry bool
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if !isRetry {
w.Header().Set("X-Rate-Limit-Limit", "0")
w.Header().Set("X-Rate-Limit-Remaining", "0")
w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10))
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte{})
isRetry = true
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
}
}))
}

func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := createServer(newServer)
if testRateLimitRetry {
ts = createCustomServer(newServer)
}
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
Expand Down Expand Up @@ -337,7 +371,7 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -359,7 +393,28 @@ func TestGetHTTPS(t *testing.T) {
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRateLimitRetry(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -381,7 +436,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -403,7 +458,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -424,7 +479,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
if msg.StatusCode == http.StatusTooManyRequests {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
continue
}
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
Expand Down

0 comments on commit 9f9f95a

Please sign in to comment.