Skip to content
Permalink
Browse files

Improve error when ES ingest node plugins are not loaded

We're parsing the Elasticsearch JSON error and try to produce an
error message that is as helpful as possible. The following cases
are detected:

* A plugin providing a processor is missing. In case the plugin is one of
  `ingest-geoip` or `ingest-user-agent`, we can also suggest the command that
  installs them.
* Elasticsearch < 5.0. We now detect this and tell the user that ES 5.0 is
  required by FBM.

A drawback of this approach is that if both the GeoIP and User-Agent plugins
are missing, only one will be reported. This might get solved by including the
user-agent one in ES, or by improving the error we get from ES, or by us querying
the node stats API

Note: this contains a change in the ES client, which makes it return the body
in case of errors. I think we need that part anyway, otherwise we often show
errors like `400 Bad request` without any other details. I tried to do a minimal
change there, I hope I didn't introduce any changes in behaviour.
  • Loading branch information...
tsg committed Feb 27, 2017
1 parent 1391731 commit 595dd4872eada72c80969c023f43eff6c7cc579f
@@ -1,10 +1,12 @@
package fileset

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
@@ -245,7 +247,7 @@ func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
// PipelineLoader is a subset of the Elasticsearch client API capable of loading
// the pipelines.
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) (error, []byte)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
}

@@ -273,14 +275,78 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
return nil
}
err := esClient.LoadJSON(path, content)
err, body := esClient.LoadJSON(path, content)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return interpretError(err, body)
}
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
return nil
}

func interpretError(initialErr error, body []byte) error {
var response struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
Header struct {
ProcessorType string `json:"processor_type"`
} `json:"header"`
Index string `json:"index"`
} `json:"root_cause"`
} `json:"error"`
}
err := json.Unmarshal(body, &response)
if err != nil {
// this might be ES < 2.0. Do a best effort to check for ES 1.x
var response1x struct {
Error string `json:"error"`
}
json.Unmarshal(body, &response1x)
if response1x.Error != "" {
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s.",
initialErr, body)
}

// missing plugins?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "parse_exception" &&
strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&
response.Error.RootCause[0].Header.ProcessorType != "" {

plugins := map[string]string{
"geoip": "ingest-geoip",
"user_agent": "ingest-user-agent",
}
plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
if !ok {
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
}

return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
"You can installing using the following command in the Elasticsearch home directory:\n"+
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
}

// older ES version?
if len(response.Error.RootCause) > 0 &&
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
response.Error.RootCause[0].Index == "_ingest" {

return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
"The Filebeat modules require Elasticsearch >= 5.0. "+
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
@@ -3,6 +3,7 @@
package fileset

import (
"errors"
"fmt"
"path/filepath"
"testing"
@@ -344,3 +345,44 @@ func TestMissingModuleFolder(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(prospectors))
}

func TestInterpretError(t *testing.T) {
tests := []struct {
Test string
Input string
Output string
}{
{
Test: "geoip not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}}],"type":"parse_exception","reason":"No processor type exists with name [geoip]","header":{"processor_type":"geoip"}},"status":400}`,
Output: "This module requires the ingest-geoip plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-geoip",
},
{
Test: "user-agent not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}}],"type":"parse_exception","reason":"No processor type exists with name [user_agent]","header":{"processor_type":"user_agent"}},"status":400}`,
Output: "This module requires the ingest-user-agent plugin to be installed in Elasticsearch. You can installing using the following command in the Elasticsearch home directory:\n sudo bin/elasticsearch-plugin install ingest-user-agent",
},
{
Test: "other plugin not installed",
Input: `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
Output: "This module requires an Elasticsearch plugin that provides the hello_test processor. " +
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. " +
"Response body: " + `{"error":{"root_cause":[{"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}}],"type":"parse_exception","reason":"No processor type exists with name [hello_test]","header":{"processor_type":"hello_test"}},"status":400}`,
},
{
Test: "Elasticsearch 2.4",
Input: `{"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
Output: `The Ingest Node functionality seems to be missing from Elasticsearch. The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"}],"type":"invalid_index_name_exception","reason":"Invalid index name [_ingest], must not start with '_'","index":"_ingest"},"status":400}`,
},
{
Test: "Elasticsearch 1.7",
Input: `{"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
Output: `The Filebeat modules require Elasticsearch >= 5.0. This is the response I got from Elasticsearch: {"error":"InvalidIndexNameException[[_ingest] Invalid index name [_ingest], must not start with '_']","status":400}`,
},
}

for _, test := range tests {
errResult := interpretError(errors.New("test"), []byte(test.Input))
assert.Equal(t, errResult.Error(), test.Output, test.Test)
}
}
@@ -10,7 +10,7 @@ import (
// DashboardLoader is a subset of the Elasticsearch client API capable of
// loading the dashboards.
type DashboardLoader interface {
LoadJSON(path string, json map[string]interface{}) error
LoadJSON(path string, json map[string]interface{}) (error, []byte)
CreateIndex(index string, body interface{}) (int, *elasticsearch.QueryResult, error)
}

@@ -105,9 +105,9 @@ func (imp Importer) ImportJSONFile(fileType string, file string) error {
json.Unmarshal(reader, &jsonContent)
fileBase := strings.TrimSuffix(filepath.Base(file), filepath.Ext(file))

err = imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
err, body := imp.client.LoadJSON(path+"/"+fileBase, jsonContent)
if err != nil {
return fmt.Errorf("Failed to load %s under %s/%s: %s", file, path, fileBase, err)
return fmt.Errorf("Failed to load %s under %s/%s: %s. Response body: %s", file, path, fileBase, err, body)
}

return nil
@@ -271,7 +271,7 @@ func (imp Importer) ImportSearch(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/search/" + searchName
imp.statusMsg("Import search %s", file)

if err = imp.client.LoadJSON(path, searchContent); err != nil {
if err, _ = imp.client.LoadJSON(path, searchContent); err != nil {
return err
}

@@ -301,7 +301,7 @@ func (imp Importer) ImportIndex(file string) error {
path := "/" + imp.cfg.KibanaIndex + "/index-pattern/" + indexName
imp.statusMsg("Import index to %s from %s\n", path, file)

if err = imp.client.LoadJSON(path, indexContent); err != nil {
if err, _ = imp.client.LoadJSON(path, indexContent); err != nil {
return err
}
return nil
@@ -584,24 +584,24 @@ func (client *Client) PublishEvent(data outputs.Data) error {
func (client *Client) LoadTemplate(templateName string, template map[string]interface{}) error {

path := "/_template/" + templateName
err := client.LoadJSON(path, template)
err, body := client.LoadJSON(path, template)
if err != nil {
return fmt.Errorf("couldn't load template: %v", err)
return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body)
}
logp.Info("Elasticsearch template with name '%s' loaded", templateName)
return nil
}

func (client *Client) LoadJSON(path string, json map[string]interface{}) error {
status, _, err := client.Request("PUT", path, "", nil, json)
func (client *Client) LoadJSON(path string, json map[string]interface{}) (error, []byte) {
status, body, err := client.Request("PUT", path, "", nil, json)
if err != nil {
return fmt.Errorf("couldn't load json. Error: %s", err)
return fmt.Errorf("couldn't load json. Error: %s", err), body
}
if status > 300 {
return fmt.Errorf("couldn't load json. Status: %v", status)
return fmt.Errorf("couldn't load json. Status: %v", status), body
}

return nil
return nil, []byte{}
}

// CheckTemplate checks if a given template already exist. It returns true if
@@ -718,15 +718,16 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
defer closing(resp.Body)

status := resp.StatusCode
var retErr error
if status >= 300 {
return status, nil, fmt.Errorf("%v", resp.Status)
retErr = fmt.Errorf("%v", resp.Status)
}

obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return status, nil, err
}
return status, obj, nil
return status, obj, retErr
}

func closing(c io.Closer) {

0 comments on commit 595dd48

Please sign in to comment.
You can’t perform that action at this time.