Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate that required functionality in Elasticsearch is available upon initial connection #25351

Merged
merged 11 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new setting `gc_percent` for tuning the garbage collector limits via configuration file. {pull}25394[25394]
- Add `unit` and `metric_type` properties to fields.yml for populating field metadata in Elasticsearch templates {pull}25419[25419]
- Add new option `suffix` to `logging.files` to control how log files are rotated. {pull}25464[25464]
- Validate that required functionality in Elasticsearch is available upon initial connection. {pull}25351[25351]

*Auditbeat*

Expand Down
6 changes: 6 additions & 0 deletions libbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/cmd/platformcheck"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
)

func init() {
Expand Down Expand Up @@ -57,6 +59,10 @@ type BeatsRootCmd struct {
// run command, which will be called if no args are given (for backwards compatibility),
// and beat settings
func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings) *BeatsRootCmd {
// Add global Elasticsearch license endpoint check.
// Check we are actually talking with Elasticsearch, to ensure that used features actually exist.
elasticsearch.RegisterGlobalCallback(licenser.FetchAndVerify)

if err := platformcheck.CheckNativePlatformCompat(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to initialize: %v\n", err)
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package licenser

Expand All @@ -9,8 +22,6 @@ import (
"fmt"
"math/rand"
"net/http"
"strconv"
"time"

"github.com/pkg/errors"

Expand All @@ -26,46 +37,6 @@ var params = map[string]string{
"human": "false",
}

// UnmarshalJSON takes a bytes array and convert it to the appropriate license type.
func (t *LicenseType) UnmarshalJSON(b []byte) error {
if len(b) <= 2 {
return fmt.Errorf("invalid string for license type, received: '%s'", string(b))
}
s := string(b[1 : len(b)-1])
if license, ok := licenseLookup[s]; ok {
*t = license
return nil
}

return fmt.Errorf("unknown license type, received: '%s'", s)
}

// UnmarshalJSON takes a bytes array and convert it to the appropriate state.
func (st *State) UnmarshalJSON(b []byte) error {
// we are only interested in the content between the quotes.
if len(b) <= 2 {
return fmt.Errorf("invalid string for state, received: '%s'", string(b))
}

s := string(b[1 : len(b)-1])
if state, ok := stateLookup[s]; ok {
*st = state
return nil
}
return fmt.Errorf("unknown state, received: '%s'", s)
}

// UnmarshalJSON takes a bytes array and transform the int64 to a golang time.
func (et *expiryTime) UnmarshalJSON(b []byte) error {
ts, err := strconv.ParseInt(string(b), 0, 64)
if err != nil {
return errors.Wrap(err, "could not parse value for expiry time")
}

*et = expiryTime(time.Unix(0, int64(time.Millisecond)*int64(ts)).UTC())
return nil
}

type esclient interface {
Request(
method,
Expand All @@ -91,36 +62,23 @@ func NewElasticFetcher(client esclient) *ElasticFetcher {
// Fetch retrieves the license information from an Elasticsearch Client, it will call the `_license`
// endpoint and will return a parsed license. If the `_license` endpoint is unreacheable we will
// return the OSS License otherwise we return an error.
func (f *ElasticFetcher) Fetch() (*License, error) {
func (f *ElasticFetcher) Fetch() (License, error) {
status, body, err := f.client.Request("GET", licenseURL, "", params, nil)
// When we are running an OSS release of elasticsearch the _license endpoint will return a 405,
// "Method Not Allowed", so we return the default OSS license.
if status == http.StatusBadRequest {
f.log.Debug("Received 'Bad request' (400) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusMethodNotAllowed {
f.log.Debug("Received 'Method Not allowed' (405) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusUnauthorized {
return nil, errors.New("unauthorized access, could not connect to the xpack endpoint, verify your credentials")
return License{}, errors.New("unauthorized access, could not connect to the xpack endpoint, verify your credentials")
}

if err != nil {
return nil, errors.Wrap(err, "could not retrieve the license information from the cluster")
return License{}, err
}

if status != http.StatusOK {
return nil, fmt.Errorf("error from server, response code: %d", status)
return License{}, fmt.Errorf("error from server, response code: %d", status)
}

license, err := f.parseJSON(body)
if err != nil {
f.log.Debugw("Invalid response from server", "body", string(body))
return nil, errors.Wrap(err, "could not extract license information from the server response")
return License{}, fmt.Errorf("failed to parse /_license response: %w", err)
}

return license, nil
Expand All @@ -131,16 +89,12 @@ type xpackResponse struct {
License License `json:"license"`
}

func (f *ElasticFetcher) parseJSON(b []byte) (*License, error) {
info := &xpackResponse{}

if err := json.Unmarshal(b, info); err != nil {
return nil, err
func (f *ElasticFetcher) parseJSON(b []byte) (License, error) {
var info xpackResponse
if err := json.Unmarshal(b, &info); err != nil {
return License{}, err
}

license := info.License

return &license, nil
return info.License, nil
}

// esClientMux is taking care of round robin request over an array of elasticsearch client, note that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

Expand Down Expand Up @@ -45,9 +58,7 @@ func TestElasticsearch(t *testing.T) {
return
}

assert.Equal(t, Trial, license.Get())
assert.Equal(t, Trial, license.Type)
assert.Equal(t, Basic, license.Type)
assert.Equal(t, Active, license.Status)

assert.NotEmpty(t, license.UUID)
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package licenser

import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -46,12 +58,8 @@ func TestParseJSON(t *testing.T) {
defer c.Close()

fetcher := NewElasticFetcher(c)
oss, err := fetcher.Fetch()
if assert.NoError(t, err) {
return
}

assert.Equal(t, OSSLicense, oss)
_, err := fetcher.Fetch()
assert.Error(t, err)
})

t.Run("OSS release of Elasticsearch (Code: 400)", func(t *testing.T) {
Expand All @@ -63,12 +71,8 @@ func TestParseJSON(t *testing.T) {
defer c.Close()

fetcher := NewElasticFetcher(c)
oss, err := fetcher.Fetch()
if assert.NoError(t, err) {
return
}

assert.Equal(t, OSSLicense, oss)
_, err := fetcher.Fetch()
assert.Error(t, err)
})

t.Run("malformed JSON", func(t *testing.T) {
Expand Down Expand Up @@ -144,32 +148,4 @@ func TestParseJSON(t *testing.T) {
return nil
})
})

t.Run("parse milliseconds", func(t *testing.T) {
t.Run("invalid", func(t *testing.T) {
b := []byte("{ \"v\": \"\"}")
ts := struct {
V expiryTime `json:"v"`
}{}

err := json.Unmarshal(b, &ts)
assert.Error(t, err)
})

t.Run("valid", func(t *testing.T) {
b := []byte("{ \"v\": 1538060781728 }")
ts := struct {
V expiryTime `json:"v"`
}{}

err := json.Unmarshal(b, &ts)
if !assert.NoError(t, err) {
return
}

// 2018-09-27 15:06:21.728 +0000 UTC
d := time.Date(2018, 9, 27, 15, 6, 21, 728000000, time.UTC).Sub((time.Time(ts.V)))
assert.Equal(t, time.Duration(0), d)
})
})
}
45 changes: 45 additions & 0 deletions libbeat/licenser/es_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package licenser

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Verify checks if the connection endpoint is really Elasticsearch.
func FetchAndVerify(client *eslegclient.Connection) error {
// Logger created earlier than this place are at risk of discarding any log statement.
log := logp.NewLogger("elasticsearch")

fetcher := NewElasticFetcher(client)
license, err := fetcher.Fetch()
if err != nil {
return fmt.Errorf("could not connect to a compatible version of Elasticsearch: %w", err)
}

// Only notify users if they have an Elasticsearch license that has been expired.
// We still will continue publish events as usual.
if IsExpired(license) {
log.Warn("Elasticsearch license is not active, please check Elasticsearch's licensing information at https://www.elastic.co/subscriptions.")
}

return nil
}
Loading