Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error when ES Ingest node plugins are not loaded #3676

Merged
merged 3 commits into from Mar 1, 2017
Merged
Changes from 2 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -1,10 +1,12 @@
package fileset

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
@@ -245,7 +247,7 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.

This comment has been minimized.

Copy link
@ruflin

ruflin Feb 27, 2017

Collaborator

I would use error as the second return argument

This comment has been minimized.

Copy link
@tsg

tsg Feb 27, 2017

Author Collaborator

I updated this to have the error as the last returned arg.

type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
}

@@ -273,14 +275,78 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
return nil
}
err := esClient.LoadJSON(path, content)
body, err := esClient.LoadJSON(path, content)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return interpretError(err, body)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}

func interpretError(initialErr error, body []byte) error {
var response struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
Header struct {
ProcessorType string `json:"processor_type"`
} `json:"header"`
Index string `json:"index"`
} `json:"root_cause"`
} `json:"error"`
}
err := json.Unmarshal(body, &response)
if err != nil {

This comment has been minimized.

Copy link
@ruflin

ruflin Feb 27, 2017

Collaborator

client.Connection.version could be used to check the ES version. Unfortunately the variable is currently not public.

This comment has been minimized.

Copy link
@tsg

tsg Feb 27, 2017

Author Collaborator

Hmm, yeah, that would be an option. But we also don't have the client here and adding it would complicate unit testing, and we still need to do the error checking anyway, so I think it doesn't win us that much to add version checks.

// this might be ES < 2.0. Do a best effort to check for ES 1.x
var response1x struct {
Error string `json:"error"`
}
json.Unmarshal(body, &response1x)

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

json.Unmarshal might also fails, cause body is not json at all. the client was changed to always return the raw body. e.g. if error is in nginx proxying to ES, content might be a plain message.

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

I know, but in that case we fall back to the "Could not load pipeline. Additionally, error decoding body:" message, which I think is what we want. Looking into making the code clearer..

if response1x.Error != "" {
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s.",
initialErr, body)
}

// missing plugins?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "parse_exception" &&

This comment has been minimized.

Copy link
@ruflin

ruflin Feb 27, 2017

Collaborator

ES should (in the future) expose here a special root cause type so no check of the text is needed.

This comment has been minimized.

Copy link
@tsg

tsg Feb 27, 2017

Author Collaborator

Yeah, I think mvg is working on improving that.

strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

instead of checking error message prefix, is there an error type report we can check?

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

The error type is parse_exception, so that's too generic.

@martijnvg are you adding a new error type as part of your PRs?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Feb 28, 2017

Member

The PR doesn't change that. It failed during parsing hence the error type is parse_exception.

I'm open in changing this, but not sure what other existing error type to use. ES is very defensive in introducing new error types. The only existing general error type that comes up to me is resource_not_found. It is generic too, but maybe in this context better? (indicating that a processor type doesn't exist).

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

I think I'd prefer not changing it in that case, because we'd still have to leave this branch in the code for ES < 5.4, so it's probably not worth it.

response.Error.RootCause[0].Header.ProcessorType != "" {

plugins := map[string]string{
"geoip": "ingest-geoip",
"user_agent": "ingest-user-agent",
}

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

generalize map with:

func ingestName(name string) name {
  return fmt.Sprintf("ingest-%v", strings.Replace(name, "_", "-"))
}

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

Hmm, i don't know if that is future proof. There's no guarantees that other plugins will follow this pattern.

plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
if !ok {
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
}

return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
"You can installing using the following command in the Elasticsearch home directory:\n"+
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
}

// older ES version?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
response.Error.RootCause[0].Index == "_ingest" {

return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
"The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

which error message will be returned in ingest node is disabled?

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

Just tried it out. There's no real way of disabling the ingest functionality, but what one can do is set node.ingest: false on all the ES nodes. In that case, the pipeline loading works as usual but the _bulk insert fails with:

path: /_bulk, params: {}
java.lang.IllegalStateException: There are no ingest nodes in this cluster, unable to forward request to an ingest node.

I'd say improving the error handling in that case is beyond the scope of this PR.

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

ouch. Maybe we can probe ingest node availability via simulate API?

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

Yeah, maybe, some other time :)


return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
@@ -3,6 +3,7 @@
package fileset

import (
"errors"
"fmt"
"path/filepath"
"testing"
@@ -344,3 +345,44 @@ func TestMissingModuleFolder(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(prospectors))
}

func TestInterpretError(t *testing.T) {
tests := []struct {
Test string
Input string
Output string
}{
{
Test: "geoip not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}}],"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}},"status":400}`,
Output: "This module requires the ingest-geoip plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-geoip",
},
{
Test: "user-agent not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}}],"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}},"status":400}`,
Output: "This module requires the ingest-user-agent plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-user-agent",
},
{
Test: "other plugin not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
Output: "This module requires an Elasticsearch plugin that provides the hello_test processor. " +
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. " +
"Response body: " + `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
},
{
Test: "Elasticsearch 2.4",
Input: `{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
Output: `The Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
},
{
Test: "Elasticsearch 1.7",
Input: `{"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
Output: `The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
},
}

for _, test := range tests {
errResult := interpretError(errors.New("test"), []byte(test.Input))
assert.Equal(t, errResult.Error(), test.Output, test.Test)
}
}
@@ -10,7 +10,7 @@ import (
// DashboardLoader is a subset of the Elasticsearch client API capable of
// loading the dashboards.
type DashboardLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
CreateIndex(index string, body interface{}) (int, *elasticsearch.QueryResult, error)
}

@@ -105,9 +105,9 @@ func (imp Importer) ImportJSONFile(fileType string, file string) error {
json.Unmarshal(reader, &jsonContent)
fileBase := strings.TrimSuffix(filepath.Base(file), filepath.Ext(file))

err = imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
body, err := imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
if err != nil {
return fmt.Errorf("Failed to load %s under %s/%s: %s", file, path, fileBase, err)
return fmt.Errorf("Failed to load %s under %s/%s: %s. Response body: %s", file, path, fileBase, err, body)
}

return nil
@@ -271,7 +271,7 @@ func (imp Importer) ImportSearch(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/search/" + searchName
imp.statusMsg("Import search %s", file)

if err = imp.client.LoadJSON(path, searchContent); err != nil {
if _, err = imp.client.LoadJSON(path, searchContent); err != nil {
return err
}

@@ -301,7 +301,7 @@ func (imp Importer) ImportIndex(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/index-pattern/" + indexName
imp.statusMsg("Import index to %s from %s\n", path, file)

if err = imp.client.LoadJSON(path, indexContent); err != nil {
if _, err = imp.client.LoadJSON(path, indexContent); err != nil {
return err
}
return nil
@@ -584,24 +584,24 @@ func (client *Client) PublishEvent(data outputs.Data) error {
func (client *Client) LoadTemplate(templateName string, template map[string]interface{}) error {

path := "/_template/" + templateName
err := client.LoadJSON(path, template)
body, err := client.LoadJSON(path, template)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body)
}
logp.Info("Elasticsearch template with name '%s' loaded", templateName)
return nil
}

func (client *Client) LoadJSON(path string, json map[string]interface{}) error {
status, _, err := client.Request("PUT", path, "", nil, json)
func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte, error) {
status, body, err := client.Request("PUT", path, "", nil, json)
if err != nil {
return fmt.Errorf("couldn't load json. Error: %s", err)
return body, fmt.Errorf("couldn't load json. Error: %s", err)
}
if status > 300 {
return fmt.Errorf("couldn't load json. Status: %v", status)
return body, fmt.Errorf("couldn't load json. Status: %v", status)
}

return nil
return body, nil
}

// CheckTemplate checks if a given template already exist. It returns true if
@@ -718,15 +718,16 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
defer closing(resp.Body)

status := resp.StatusCode
var retErr error
if status >= 300 {
return status, nil, fmt.Errorf("%v", resp.Status)
retErr = fmt.Errorf("%v", resp.Status)
}

obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return status, nil, err

This comment has been minimized.

Copy link
@urso

urso Feb 28, 2017

Collaborator

we want to return retErr or err here?

This comment has been minimized.

Copy link
@tsg

tsg Feb 28, 2017

Author Collaborator

Hmm, debatable I guess. I'm changing it to retErr because I think that's closer to the previous behavior. Thanks.

}
return status, obj, nil
return status, obj, retErr
}

func closing(c io.Closer) {
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.