From 7cee213ae28fbd462a3830597dc765681f364b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Wed, 21 Jul 2021 17:40:27 +0200 Subject: [PATCH 1/7] Client: Improve product check Differentiate unspported/unknown products Typed Errors --- elasticsearch.go | 72 ++++++++++++++++++++++++++++------ elasticsearch_internal_test.go | 38 +++++++++++++++++- 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/elasticsearch.go b/elasticsearch.go index f444cfa1e3..c7b6daa16f 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -47,9 +47,10 @@ func init() { } const ( - defaultURL = "http://localhost:9200" - tagline = "You Know, for Search" - unknownProduct = "the client noticed that the server is not Elasticsearch and we do not support this unknown product" + defaultURL = "http://localhost:9200" + tagline = "You Know, for Search" + unknownProduct = "the client noticed that the server is not Elasticsearch and we do not support this unknown product" + unsupportedProduct = "The client noticed that the server is not a supported distribution of Elasticsearch" ) // Version returns the package version as a string. @@ -121,6 +122,32 @@ type info struct { Tagline string `json:"tagline"` } +type errContactEs struct { + message string +} + +func newErrContactEs(message string) errContactEs { + return errContactEs{message: message} +} + +func (e errContactEs) Error() string { + return e.message +} + +type ErrProductCheck struct { + message string +} + +func newErrProductCheck(message string) ErrProductCheck { + return ErrProductCheck{message: message} +} + +func (e ErrProductCheck) Error() string { + return e.message +} + + + // NewDefaultClient creates a new client with default options. // // It will use http://localhost:9200 as the default address. @@ -232,7 +259,7 @@ func NewClient(cfg Config) (*Client, error) { // func genuineCheckHeader(header http.Header) error { if header.Get("X-Elastic-Product") != "Elasticsearch" { - return fmt.Errorf(unknownProduct) + return newErrProductCheck(unknownProduct) } return nil } @@ -246,18 +273,19 @@ func genuineCheckInfo(info info) error { } if major < 6 { - return fmt.Errorf(unknownProduct) + return newErrProductCheck(unknownProduct) } if major < 7 { if info.Tagline != tagline { - return fmt.Errorf(unknownProduct) + return newErrProductCheck(unknownProduct) } } if major >= 7 { if minor < 14 { - if info.Tagline != tagline || - info.Version.BuildFlavor != "default" { - return fmt.Errorf(unknownProduct) + if info.Tagline != tagline { + return newErrProductCheck(unknownProduct) + } else if info.Version.BuildFlavor != "default" { + return newErrProductCheck(unsupportedProduct) } } } @@ -299,7 +327,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { if err == nil { checkHeader := func() error { return genuineCheckHeader(res.Header) } if err := c.doProductCheck(checkHeader); err != nil { - res.Body.Close() + if res.Body != nil { + res.Body.Close() + } return nil, err } } @@ -312,18 +342,34 @@ func (c *Client) doProductCheck(f func() error) error { c.productCheckMu.RLock() productCheckSuccess := c.productCheckSuccess c.productCheckMu.RUnlock() + if productCheckSuccess { return nil } + c.productCheckMu.Lock() defer c.productCheckMu.Unlock() + if c.productCheckSuccess { return nil } + if err := f(); err != nil { - return err + // At this point either the check: + // - could not make contact with Elasticsearch + // in which case we should retry on the next request. + // - determined the cluster is not supported + // then we return the error to the user. + switch err.(type) { + case errContactEs: + return nil + default: + return err + } } + c.productCheckSuccess = true + return nil } @@ -338,6 +384,10 @@ func (c *Client) productCheck() error { return err } + if res.IsError() { + return newErrContactEs("cannot retrieve info from Elasticsearch") + } + contentType := res.Header.Get("Content-Type") if res.Body != nil { defer res.Body.Close() diff --git a/elasticsearch_internal_test.go b/elasticsearch_internal_test.go index 941c38dce0..8f16b36836 100644 --- a/elasticsearch_internal_test.go +++ b/elasticsearch_internal_test.go @@ -471,6 +471,7 @@ func TestGenuineCheckInfo(t *testing.T) { name string info info wantErr bool + err error }{ { name: "Genuine Elasticsearch 7.14.0", @@ -482,6 +483,7 @@ func TestGenuineCheckInfo(t *testing.T) { Tagline: "You Know, for Search", }, wantErr: false, + err: nil, }, { name: "Genuine Elasticsearch 6.15.1", @@ -493,6 +495,7 @@ func TestGenuineCheckInfo(t *testing.T) { Tagline: "You Know, for Search", }, wantErr: false, + err: nil, }, { name: "Not so genuine Elasticsearch 7 major", @@ -504,6 +507,7 @@ func TestGenuineCheckInfo(t *testing.T) { Tagline: "You Know, for Search", }, wantErr: true, + err: errors.New(unknownProduct), }, { name: "Not so genuine Elasticsearch 6 major", @@ -515,6 +519,7 @@ func TestGenuineCheckInfo(t *testing.T) { Tagline: "You Know, for Fun", }, wantErr: true, + err: errors.New(unknownProduct), }, { name: "Way older Elasticsearch major", @@ -526,11 +531,24 @@ func TestGenuineCheckInfo(t *testing.T) { Tagline: "You Know, for Fun", }, wantErr: true, + err: errors.New(unknownProduct), + }, + { + name: "Elasticsearch oss", + info: info{ + Version: esVersion{ + Number: "7.10.0", + BuildFlavor: "oss", + }, + Tagline: "You Know, for Search", + }, + wantErr: true, + err: errors.New(unsupportedProduct), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := genuineCheckInfo(tt.info); (err != nil) != tt.wantErr { + if err := genuineCheckInfo(tt.info); (err != nil) != tt.wantErr && err != tt.err { t.Errorf("genuineCheckInfo() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -621,6 +639,24 @@ func TestResponseCheckOnly(t *testing.T) { requestErr: errors.New("request failed"), wantErr: true, }, + { + name: "Valid request, 500 response", + useResponseCheckOnly: false, + response: &http.Response{ + StatusCode: http.StatusInternalServerError, + }, + requestErr: nil, + wantErr: true, + }, + { + name: "Valid request, 404 response", + useResponseCheckOnly: false, + response: &http.Response{ + StatusCode: http.StatusNotFound, + }, + requestErr: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From aa51011d81f0dd91fa5aaba08841f64c11193c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Wed, 21 Jul 2021 18:34:02 +0200 Subject: [PATCH 2/7] Client: Add tests to reflect retry intended behavior --- elasticsearch.go | 24 +++++++-------- elasticsearch_internal_test.go | 55 +++++++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/elasticsearch.go b/elasticsearch.go index c7b6daa16f..51cc17d87e 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -324,8 +324,13 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { res, err := c.Transport.Perform(req) // ResponseCheck path continues, we run the header check on the first answer from ES. - if err == nil { - checkHeader := func() error { return genuineCheckHeader(res.Header) } + if c.useResponseCheckOnly { + checkHeader := func() error { + if res != nil { + return genuineCheckHeader(res.Header) + } + return nil + } if err := c.doProductCheck(checkHeader); err != nil { if res.Body != nil { res.Body.Close() @@ -400,19 +405,14 @@ func (c *Client) productCheck() error { } } - switch res.StatusCode { - case http.StatusForbidden: - case http.StatusUnauthorized: - break - default: - err = genuineCheckHeader(res.Header) + err = genuineCheckHeader(res.Header) - if err != nil { - if info.Version.Number != "" { - err = genuineCheckInfo(info) - } + if err != nil { + if info.Version.Number != "" { + err = genuineCheckInfo(info) } } + if err != nil { return err } diff --git a/elasticsearch_internal_test.go b/elasticsearch_internal_test.go index 8f16b36836..669571c948 100644 --- a/elasticsearch_internal_test.go +++ b/elasticsearch_internal_test.go @@ -646,7 +646,7 @@ func TestResponseCheckOnly(t *testing.T) { StatusCode: http.StatusInternalServerError, }, requestErr: nil, - wantErr: true, + wantErr: false, }, { name: "Valid request, 404 response", @@ -655,7 +655,7 @@ func TestResponseCheckOnly(t *testing.T) { StatusCode: http.StatusNotFound, }, requestErr: nil, - wantErr: true, + wantErr: false, }, } for _, tt := range tests { @@ -690,16 +690,57 @@ func TestProductCheckError(t *testing.T) { defer server.Close() c, _ := NewClient(Config{Addresses: []string{server.URL}, DisableRetry: true}) - if _, err := c.Cat.Indices(); err == nil { - t.Fatal("expected error") + if _, err := c.Cat.Indices(); err != nil { + t.Fatal("first unexpected error") } if _, err := c.Cat.Indices(); err != nil { - t.Fatalf("unexpected error: %s", err) + t.Fatalf("second unexpected error: %s", err) } - if n := len(requestPaths); n != 3 { + if n := len(requestPaths); n != 4 { t.Fatalf("expected 3 requests, got %d", n) } - if !reflect.DeepEqual(requestPaths, []string{"/", "/", "/_cat/indices"}) { + if !reflect.DeepEqual(requestPaths, []string{"/", "/_cat/indices", "/", "/_cat/indices"}) { t.Fatalf("unexpected request paths: %s", requestPaths) } } + + +func TestProductCheckRetry(t *testing.T) { + var requestPaths []string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestPaths = append(requestPaths, r.URL.Path) + if len(requestPaths) < 4 { + // Simulate transient error from a proxy on the first request. + // This must not be cached by the client. + w.WriteHeader(http.StatusBadGateway) + return + } + w.Header().Set("X-Elastic-Product", "Elasticsearch") + w.Write([]byte("{}")) + })) + defer server.Close() + + c, _ := NewClient(Config{Addresses: []string{server.URL}, DisableRetry: true}) + if _, err := c.Cat.Indices(); err != nil { + t.Fatal("first unexpected error") + } + if _, err := c.Cat.Indices(); err != nil { + t.Fatalf("second unexpected error: %s", err) + } + if c.productCheckSuccess { + t.Fatalf("product check should not be valid at this point") + } + if _, err := c.Cat.Indices(); err != nil { + t.Fatalf("third unexpected error: %s", err) + } + if n := len(requestPaths); n != 6 { + t.Fatalf("expected 3 requests, got %d", n) + } + if !reflect.DeepEqual(requestPaths, []string{"/", "/_cat/indices", "/", "/_cat/indices", "/", "/_cat/indices"}) { + t.Fatalf("unexpected request paths: %s", requestPaths) + } + + if !c.productCheckSuccess { + t.Fatalf("product check should be valid") + } +} \ No newline at end of file From e7ef98c1eb0ef5dfc1c46a457ecc51a965bea597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Wed, 21 Jul 2021 18:55:41 +0200 Subject: [PATCH 3/7] Client: Add missing comment for ErrProductCheck --- elasticsearch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/elasticsearch.go b/elasticsearch.go index 51cc17d87e..7257dd29aa 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -134,6 +134,7 @@ func (e errContactEs) Error() string { return e.message } +// ErrProductCheck is returned if the Elasticsearch cluster is not supported. type ErrProductCheck struct { message string } From 3ba865418b71a0341ea419794f7ff526e107a49f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Thu, 22 Jul 2021 10:29:53 +0200 Subject: [PATCH 4/7] Client: Simply handle http errors --- elasticsearch.go | 52 +++++--------------------------- elasticsearch_internal_test.go | 55 +++++++--------------------------- 2 files changed, 18 insertions(+), 89 deletions(-) diff --git a/elasticsearch.go b/elasticsearch.go index 7257dd29aa..4542ec1e16 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -50,7 +50,7 @@ const ( defaultURL = "http://localhost:9200" tagline = "You Know, for Search" unknownProduct = "the client noticed that the server is not Elasticsearch and we do not support this unknown product" - unsupportedProduct = "The client noticed that the server is not a supported distribution of Elasticsearch" + unsupportedProduct = "the client noticed that the server is not a supported distribution of Elasticsearch" ) // Version returns the package version as a string. @@ -122,32 +122,6 @@ type info struct { Tagline string `json:"tagline"` } -type errContactEs struct { - message string -} - -func newErrContactEs(message string) errContactEs { - return errContactEs{message: message} -} - -func (e errContactEs) Error() string { - return e.message -} - -// ErrProductCheck is returned if the Elasticsearch cluster is not supported. -type ErrProductCheck struct { - message string -} - -func newErrProductCheck(message string) ErrProductCheck { - return ErrProductCheck{message: message} -} - -func (e ErrProductCheck) Error() string { - return e.message -} - - // NewDefaultClient creates a new client with default options. // @@ -260,7 +234,7 @@ func NewClient(cfg Config) (*Client, error) { // func genuineCheckHeader(header http.Header) error { if header.Get("X-Elastic-Product") != "Elasticsearch" { - return newErrProductCheck(unknownProduct) + return errors.New(unknownProduct) } return nil } @@ -274,19 +248,19 @@ func genuineCheckInfo(info info) error { } if major < 6 { - return newErrProductCheck(unknownProduct) + return errors.New(unknownProduct) } if major < 7 { if info.Tagline != tagline { - return newErrProductCheck(unknownProduct) + return errors.New(unknownProduct) } } if major >= 7 { if minor < 14 { if info.Tagline != tagline { - return newErrProductCheck(unknownProduct) + return errors.New(unknownProduct) } else if info.Version.BuildFlavor != "default" { - return newErrProductCheck(unsupportedProduct) + return errors.New(unsupportedProduct) } } } @@ -361,17 +335,7 @@ func (c *Client) doProductCheck(f func() error) error { } if err := f(); err != nil { - // At this point either the check: - // - could not make contact with Elasticsearch - // in which case we should retry on the next request. - // - determined the cluster is not supported - // then we return the error to the user. - switch err.(type) { - case errContactEs: - return nil - default: - return err - } + return err } c.productCheckSuccess = true @@ -391,7 +355,7 @@ func (c *Client) productCheck() error { } if res.IsError() { - return newErrContactEs("cannot retrieve info from Elasticsearch") + return fmt.Errorf("cannot retrieve info from Elasticsearch") } contentType := res.Header.Get("Content-Type") diff --git a/elasticsearch_internal_test.go b/elasticsearch_internal_test.go index 669571c948..08f4a5827d 100644 --- a/elasticsearch_internal_test.go +++ b/elasticsearch_internal_test.go @@ -646,7 +646,7 @@ func TestResponseCheckOnly(t *testing.T) { StatusCode: http.StatusInternalServerError, }, requestErr: nil, - wantErr: false, + wantErr: true, }, { name: "Valid request, 404 response", @@ -655,7 +655,7 @@ func TestResponseCheckOnly(t *testing.T) { StatusCode: http.StatusNotFound, }, requestErr: nil, - wantErr: false, + wantErr: true, }, } for _, tt := range tests { @@ -690,57 +690,22 @@ func TestProductCheckError(t *testing.T) { defer server.Close() c, _ := NewClient(Config{Addresses: []string{server.URL}, DisableRetry: true}) - if _, err := c.Cat.Indices(); err != nil { - t.Fatal("first unexpected error") - } - if _, err := c.Cat.Indices(); err != nil { - t.Fatalf("second unexpected error: %s", err) - } - if n := len(requestPaths); n != 4 { - t.Fatalf("expected 3 requests, got %d", n) - } - if !reflect.DeepEqual(requestPaths, []string{"/", "/_cat/indices", "/", "/_cat/indices"}) { - t.Fatalf("unexpected request paths: %s", requestPaths) - } -} - - -func TestProductCheckRetry(t *testing.T) { - var requestPaths []string - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - requestPaths = append(requestPaths, r.URL.Path) - if len(requestPaths) < 4 { - // Simulate transient error from a proxy on the first request. - // This must not be cached by the client. - w.WriteHeader(http.StatusBadGateway) - return - } - w.Header().Set("X-Elastic-Product", "Elasticsearch") - w.Write([]byte("{}")) - })) - defer server.Close() - - c, _ := NewClient(Config{Addresses: []string{server.URL}, DisableRetry: true}) - if _, err := c.Cat.Indices(); err != nil { - t.Fatal("first unexpected error") - } - if _, err := c.Cat.Indices(); err != nil { - t.Fatalf("second unexpected error: %s", err) + if _, err := c.Cat.Indices(); err == nil { + t.Fatal("expected error") } if c.productCheckSuccess { - t.Fatalf("product check should not be valid at this point") + t.Fatalf("product check should be invalid, got %v", c.productCheckSuccess) } if _, err := c.Cat.Indices(); err != nil { - t.Fatalf("third unexpected error: %s", err) + t.Fatalf("unexpected error: %s", err) } - if n := len(requestPaths); n != 6 { + if n := len(requestPaths); n != 3 { t.Fatalf("expected 3 requests, got %d", n) } - if !reflect.DeepEqual(requestPaths, []string{"/", "/_cat/indices", "/", "/_cat/indices", "/", "/_cat/indices"}) { + if !reflect.DeepEqual(requestPaths, []string{"/", "/", "/_cat/indices"}) { t.Fatalf("unexpected request paths: %s", requestPaths) } - if !c.productCheckSuccess { - t.Fatalf("product check should be valid") + t.Fatalf("product check should be valid, got : %v", c.productCheckSuccess) } -} \ No newline at end of file +} From 3c5873eca1a366893d134306d5b81d113a579645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Thu, 22 Jul 2021 10:34:54 +0200 Subject: [PATCH 5/7] Client: Restore previous condtionnal postflight check --- elasticsearch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch.go b/elasticsearch.go index 4542ec1e16..4a6830126a 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -299,7 +299,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { res, err := c.Transport.Perform(req) // ResponseCheck path continues, we run the header check on the first answer from ES. - if c.useResponseCheckOnly { + if err == nil { checkHeader := func() error { if res != nil { return genuineCheckHeader(res.Header) From 1f8f793139d0f99de28ece6795b4a4da5cb756b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Thu, 22 Jul 2021 10:45:03 +0200 Subject: [PATCH 6/7] API: Add Info response in API Benchmarks --- esapi/esapi_benchmark_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/esapi/esapi_benchmark_test.go b/esapi/esapi_benchmark_test.go index bb447aeac9..78d7a4f832 100644 --- a/esapi/esapi_benchmark_test.go +++ b/esapi/esapi_benchmark_test.go @@ -38,7 +38,13 @@ var ( Body: ioutil.NopCloser(strings.NewReader("MOCK")), } defaultRoundTripFn = func(*http.Request) (*http.Response, error) { return defaultResponse, nil } - errorRoundTripFn = func(*http.Request) (*http.Response, error) { + errorRoundTripFn = func(request *http.Request) (*http.Response, error) { + if request.URL.Path == "/" { + return &http.Response{ + StatusCode: 200, + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + }, nil + } return &http.Response{ Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, StatusCode: 400, From 4bd9d9fa6b286110e748354b14dfa7cf1cea371f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Saint-F=C3=A9lix?= Date: Thu, 22 Jul 2021 10:57:19 +0200 Subject: [PATCH 7/7] Client: Restore postflight check --- elasticsearch.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/elasticsearch.go b/elasticsearch.go index 4a6830126a..a438d367d7 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -300,16 +300,9 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { // ResponseCheck path continues, we run the header check on the first answer from ES. if err == nil { - checkHeader := func() error { - if res != nil { - return genuineCheckHeader(res.Header) - } - return nil - } + checkHeader := func() error { return genuineCheckHeader(res.Header) } if err := c.doProductCheck(checkHeader); err != nil { - if res.Body != nil { - res.Body.Close() - } + res.Body.Close() return nil, err } }