Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use _doc if ES major version is 7 #9056

Merged
merged 26 commits into from Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d

*Affecting all Beats*
- Unify dashboard exporter tools. {pull}9097[9097]
- Use _doc as document type of the Elasticsearch major version is 7. {pull}9056[9056]
urso marked this conversation as resolved.
Show resolved Hide resolved

*Auditbeat*

Expand Down
21 changes: 7 additions & 14 deletions filebeat/beater/filebeat.go
Expand Up @@ -221,12 +221,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errors.Errorf("Error creating Kibana client: %v", err)
}

kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion())
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Errorf("Error checking Kibana version: %v", err)
}

if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil {
if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}

Expand All @@ -252,7 +247,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
continue
}

if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil {
if err := setupMLBasedOnVersion(set, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}

Expand All @@ -262,18 +257,16 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errs.Err()
}

func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error {
if isElasticsearchLoads(kibanaVersion) {
func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error {
if isElasticsearchLoads(kibanaClient.GetVersion()) {
urso marked this conversation as resolved.
Show resolved Hide resolved
return reg.LoadML(esClient)
}
return reg.SetupML(esClient, kibanaClient)
}

func isElasticsearchLoads(kibanaVersion *common.Version) bool {
if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 {
return true
}
return false
func isElasticsearchLoads(kibanaVersion common.Version) bool {
return kibanaVersion.Major < 6 ||
(kibanaVersion.Major == 6 && kibanaVersion.Minor < 1)
}

// Run allows the beater to be run as a beat.
Expand Down
16 changes: 8 additions & 8 deletions filebeat/fileset/fileset.go
Expand Up @@ -25,6 +25,7 @@ package fileset
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -194,15 +195,14 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {

// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion common.Version) (map[string]interface{}, error) {
retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}

haveVersion, err := common.NewVersion(esVersion)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
if !esVersion.IsValid() {
return vars, errors.New("Unknown Elasticsearch version")
}

for _, vals := range fs.manifest.Vars {
Expand All @@ -219,11 +219,11 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}

logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion)
logp.Debug("fileset", "Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)

if haveVersion.LessThan(minVersion) {
if esVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion)
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
}
}
}
Expand Down Expand Up @@ -360,7 +360,7 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
}

// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs.
func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {
func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
Expand Down
53 changes: 30 additions & 23 deletions filebeat/fileset/fileset_test.go
Expand Up @@ -27,7 +27,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -213,7 +215,8 @@ func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))

pipelineID, content, err := fs.GetPipeline("5.2.0")
version := common.MustNewVersion("5.2.0")
pipelineID, content, err := fs.GetPipeline(*version)
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
assert.Contains(t, content, "description")
Expand All @@ -234,27 +237,31 @@ func TestGetPipelineConvertTS(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, fs.Read("6.1.0"))

// ES 6.0.0 should not have beat.timezone referenced
pipelineID, content, err := fs.GetPipeline("6.0.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err := json.Marshal(content)
assert.NoError(t, err)
assert.NotContains(t, string(marshaled), "beat.timezone")

// ES 6.1.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.1.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
cases := map[string]struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer 👍

Beat string
Timezone bool
}{
"6.0.0": {Timezone: false},
"6.1.0": {Timezone: true},
"6.2.0": {Timezone: true},
}

// ES 6.2.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
for esVersion, cfg := range cases {
pipelineName := "filebeat-6.1.0-system-syslog-pipeline"

t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) {
ver := common.MustNewVersion(esVersion)
pipelineID, content, err := fs.GetPipeline(*ver)
require.NoError(t, err)
assert.Equal(t, pipelineName, pipelineID)

marshaled, err := json.Marshal(content)
require.NoError(t, err)
if cfg.Timezone {
assert.Contains(t, string(marshaled), "beat.timezone")
} else {
assert.NotContains(t, string(marshaled), "beat.timezone")
}
})
}
}
9 changes: 1 addition & 8 deletions filebeat/fileset/modules_integration_test.go
Expand Up @@ -22,7 +22,6 @@ package fileset
import (
"encoding/json"
"path/filepath"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -142,11 +141,5 @@ func TestAvailableProcessors(t *testing.T) {

func hasIngest(client *elasticsearch.Client) bool {
v := client.GetVersion()
majorVersion := string(v[0])
version, err := strconv.Atoi(majorVersion)
if err != nil {
return true
}

return version >= 5
return v.Major >= 5
}
3 changes: 2 additions & 1 deletion filebeat/fileset/pipelines.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

Expand All @@ -33,7 +34,7 @@ type PipelineLoaderFactory func() (PipelineLoader, error)
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() string
GetVersion() common.Version
}

// LoadPipelines loads the pipelines for each configured fileset.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/scripts/tester/main.go
Expand Up @@ -264,7 +264,7 @@ func runSimulate(url string, pipeline map[string]interface{}, logs []string, ver
for _, s := range sources {
d := common.MapStr{
"_index": "index",
"_type": "doc",
"_type": "_doc",
"_id": "id",
"_source": s,
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/export/dashboard.go
Expand Up @@ -73,6 +73,7 @@ func GenDashboardCmd(name, idxPrefix, beatVersion string) *cobra.Command {
if decode {
r = dashboards.DecodeExported(r)
}

err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion())
if err != nil {
fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n",
Expand Down
11 changes: 10 additions & 1 deletion libbeat/cmd/export/template.go
Expand Up @@ -57,7 +57,16 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi
}
}

tmpl, err := template.New(b.Info.Version, index, version, cfg)
if version == "" {
version = b.Info.Version
}

esVersion, err := common.NewVersion(version)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err)
}

tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating template: %+v", err)
os.Exit(1)
Expand Down
15 changes: 15 additions & 0 deletions libbeat/common/version.go
Expand Up @@ -31,6 +31,16 @@ type Version struct {
Meta string
}

// MustNewVersion creates a version from the given version string.
// If the version string is invalid, MustNewVersion panics.
func MustNewVersion(version string) *Version {
v, err := NewVersion(version)
if err != nil {
panic(err)
}
return v
}

// NewVersion expects a string in the format:
// major.minor.bugfix(-meta)
func NewVersion(version string) (*Version, error) {
Expand Down Expand Up @@ -69,6 +79,11 @@ func NewVersion(version string) (*Version, error) {
return &v, nil
}

// IsValid returns true if the version object stores a successfully parsed version number.
func (v *Version) IsValid() bool {
urso marked this conversation as resolved.
Show resolved Hide resolved
return v.version != ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called IsEmpty instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't care about the name. stdlib seems to IsZero() bool for some types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping IsValid cause of docstring:

IsValid returns true if the version object stores a successfully parsed version number.

The NewVersion constructor is no simple constructor, but actually a parser. Wonder why we parse ourselves. github.com/hashicorp/go-version is a pretty good and flexible versions package:

}

func (v *Version) IsMajor(major int) bool {
return major == v.Major
}
Expand Down
60 changes: 9 additions & 51 deletions libbeat/dashboards/dashboards.go
Expand Up @@ -22,8 +22,6 @@ import (
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"

errw "github.com/pkg/errors"

Expand Down Expand Up @@ -106,12 +104,7 @@ func ImportDashboards(

esLoader.statusMsg("Elasticsearch URL %v", esLoader.client.Connection.URL)

majorVersion, _, err := getMajorAndMinorVersion(esLoader.version)
if err != nil {
return fmt.Errorf("wrong Elasticsearch version: %v", err)
}

if majorVersion < 6 {
if esLoader.version.Major < 6 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but I wonder if we should remove support for this.

@jsoriano More related to your other PR with removing 5.x dashboards.

Copy link
Member

@jsoriano jsoriano Nov 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider to keep this in libbeat in case some community beat still wants to try to keep support for 5.x.

If not, I can remove it in in #8927, along with the importViaES method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather remove it to reduce complexity in the code. Can also be done in a follow up PR to keep things smaller.

importVia = importViaES
} else {
importVia = useKibana
Expand Down Expand Up @@ -145,17 +138,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kib
}

func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader) error {

if !isKibanaAPIavailable(kibanaLoader.version) {
return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version)
version := kibanaLoader.version
if !version.IsValid() {
return errors.New("No valid kibana version available")
}

version, err := common.NewVersion(kibanaLoader.version)
if err != nil {
return fmt.Errorf("Invalid Kibana version: %s", kibanaLoader.version)
if !isKibanaAPIavailable(kibanaLoader.version) {
return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String())
}

importer, err := NewImporter(*version, kibanaLoader.config, kibanaLoader)
importer, err := NewImporter(version, kibanaLoader.config, kibanaLoader)
if err != nil {
return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err)
}
Expand Down Expand Up @@ -187,40 +179,6 @@ func ImportDashboardsViaElasticsearch(esLoader *ElasticsearchLoader) error {
return nil
}

func getMajorAndMinorVersion(version string) (int, int, error) {
fields := strings.Split(version, ".")
if len(fields) != 3 {
return 0, 0, fmt.Errorf("wrong version %s", version)
}
majorVersion := fields[0]
minorVersion := fields[1]

majorVersionInt, err := strconv.Atoi(majorVersion)
if err != nil {
return 0, 0, err
}

minorVersionInt, err := strconv.Atoi(minorVersion)
if err != nil {
return 0, 0, err
}

return majorVersionInt, minorVersionInt, nil
}

func isKibanaAPIavailable(version string) bool {
majorVersion, minorVersion, err := getMajorAndMinorVersion(version)
if err != nil {
return false
}

if majorVersion == 5 && minorVersion >= 6 {
return true
}

if majorVersion >= 6 {
return true
}

return false
func isKibanaAPIavailable(version common.Version) bool {
return (version.Major == 5 && version.Minor >= 6) || version.Major >= 6
}