Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load the ES template on connect. #1381

Merged
merged 1 commit into from Apr 19, 2016
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Load the ES template on connect.

It used to try loading it only once on init, causing bug #1321.
This change moves the call to loadTemplate at connection time, immediately
after successful connection. This has the effect that if overwrite is true,
the template will be loaded on each new established connection.

The template is read on init time and sent to Elasticsearch at connect time.
This means that if the template path is wrong, it will be discovered at
startup (including `-configtest`).

In case there is an error loading the template, the Connect call fails.

This commit includes an integration test for the behaviour.
  • Loading branch information...
tsg committed Apr 13, 2016
commit bc53cdbe253db8cd934b20149e3ec89d3590763c
@@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
*Affecting all Beats*
- Drain response buffers when pipelining is used by redis output. {pull}1353[1353]
- Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389]
- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321]

*Packetbeat*

@@ -0,0 +1 @@
sdasda
@@ -50,7 +50,7 @@ func TestOneHostSuccessResp(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
@@ -79,7 +79,7 @@ func TestOneHost500Resp(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)
err := client.Connect(1 * time.Second)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
@@ -114,7 +114,7 @@ func TestOneHost503Resp(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
@@ -36,7 +36,7 @@ func GetTestingElasticsearch() *Client {
var address = "http://" + GetEsHost() + ":" + GetEsPort()
username := os.Getenv("ES_USER")
pass := os.Getenv("ES_PASS")
return NewClient(address, "", nil, nil, username, pass, nil)
return NewClient(address, "", nil, nil, username, pass, nil, nil)
}

func GetValidQueryResult() QueryResult {
@@ -42,7 +42,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
@@ -83,7 +83,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
@@ -125,7 +125,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
@@ -35,13 +35,16 @@ type Client struct {
json jsonReader
}

type connectCallback func(client *Client) error

type Connection struct {
URL string
Username string
Password string

http *http.Client
connected bool
http *http.Client
connected bool
onConnectCallback func() error
}

var (
@@ -61,6 +64,7 @@ func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
username, password string,
params map[string]string,
onConnectCallback connectCallback,
) *Client {
proxy := http.ProxyFromEnvironment
if proxyURL != nil {
@@ -82,6 +86,13 @@ func NewClient(
index: index,
params: params,
}

client.Connection.onConnectCallback = func() error {
if onConnectCallback != nil {
return onConnectCallback(client)
}
return nil
}
return client
}

@@ -424,6 +435,11 @@ func (conn *Connection) Connect(timeout time.Duration) error {
if !conn.connected {
return ErrNotConnected
}

err = conn.onConnectCallback()
if err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
}
return nil
}

@@ -10,6 +10,8 @@ import (
"io/ioutil"
"path/filepath"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/stretchr/testify/assert"
)

@@ -29,7 +31,7 @@ func TestCheckTemplate(t *testing.T) {
assert.Nil(t, err)

// Check for non existant template
assert.False(t, client.CheckTemplate("libbeat"))
assert.False(t, client.CheckTemplate("libbeat-notexists"))
}

func TestLoadTemplate(t *testing.T) {
@@ -129,3 +131,58 @@ func TestLoadBeatsTemplate(t *testing.T) {
assert.False(t, client.CheckTemplate(templateName))
}
}

// TestOutputLoadTemplate checks that the template is inserted before
// the first event is published.
func TestOutputLoadTemplate(t *testing.T) {

client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
if err != nil {
t.Fatal(err)
}

// delete template if it exists
client.request("DELETE", "/_template/libbeat", nil, nil)

// Make sure template is not yet there
assert.False(t, client.CheckTemplate("libbeat"))

tPath, err := filepath.Abs("../../../topbeat/topbeat.template.json")
if err != nil {
t.Fatal(err)
}
config := map[string]interface{}{
"hosts": GetEsHost(),
"template": map[string]interface{}{
"name": "libbeat",
"path": tPath,
},
}

cfg, err := common.NewConfigFrom(config)
if err != nil {
t.Fatal(err)
}

output, err := New(cfg, 0)
if err != nil {
t.Fatal(err)
}
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"host": "test-host",
"type": "libbeat",
"message": "Test message from libbeat",
}

err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event)
if err != nil {
t.Fatal(err)
}

// Guaranteed publish, so the template should be there

assert.True(t, client.CheckTemplate("libbeat"))

}
@@ -4,9 +4,11 @@ import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net/url"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
@@ -20,6 +22,9 @@ type elasticsearchOutput struct {
index string
mode mode.ConnectionMode
topology

templateContents []byte
templateMutex sync.Mutex
}

func init() {
@@ -69,7 +74,12 @@ func (out *elasticsearchOutput) init(
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config))
err = out.readTemplate(config.Template)
if err != nil {
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out))
if err != nil {
return err
}
@@ -91,8 +101,6 @@ func (out *elasticsearchOutput) init(
return err
}

loadTemplate(config.Template, clients)

if config.SaveTopology {
err := out.EnableTTL()
if err != nil {
@@ -122,52 +130,56 @@ func (out *elasticsearchOutput) init(
return nil
}

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {

// Always takes the first client
esClient := clients[0].(*Client)

// readTemplates reads the ES mapping template from the disk, if configured.
func (out *elasticsearchOutput) readTemplate(config Template) error {
if len(config.Name) > 0 {
// Look for the template in the configuration path, if it's not absolute
templatePath := paths.Resolve(paths.Config, config.Path)

logp.Info("Loading template enabled. Trying to load template: %v", templatePath)
logp.Info("Loading template enabled. Reading template file: %v", templatePath)

exists := esClient.CheckTemplate(config.Name)
var err error
out.templateContents, err = ioutil.ReadFile(templatePath)
if err != nil {
return fmt.Errorf("Error loading template %s: %v", templatePath, err)
}
}
return nil
}

// Check if template already exist or should be overwritten
if !exists || config.Overwrite {
// loadTemplate checks if the index mapping template should be loaded
// In case the template is not already loaded or overwritting is enabled, the
// template is written to index
func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) error {
out.templateMutex.Lock()
defer out.templateMutex.Unlock()

if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}
logp.Info("Trying to load template for client: %s", client)

// Load template from file
content, err := ioutil.ReadFile(templatePath)
if err != nil {
logp.Err("Could not load template from file path: %s; Error: %s", templatePath, err)
} else {
reader := bytes.NewReader(content)
err = esClient.LoadTemplate(config.Name, reader)
// Check if template already exist or should be overwritten
exists := client.CheckTemplate(config.Name)
if !exists || config.Overwrite {

if err != nil {
logp.Err("Could not load template: %v", err)
}
}
} else {
logp.Info("Template already exists and will not be overwritten.")
if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}

reader := bytes.NewReader(out.templateContents)
err := client.LoadTemplate(config.Name, reader)
if err != nil {
return fmt.Errorf("Could not load template: %v", err)
}
} else {
logp.Info("Template already exists and will not be overwritten.")
}

return nil
}

func makeClientFactory(
tls *tls.Config,
config *elasticsearchConfig,
out *elasticsearchOutput,
) func(string) (mode.ProtocolClient, error) {
return func(host string) (mode.ProtocolClient, error) {
esURL, err := getURL(config.Protocol, config.Path, host)
@@ -196,10 +208,19 @@ func makeClientFactory(
if len(params) == 0 {
params = nil
}

// define a callback to be called on connection
var onConnected connectCallback
if len(out.templateContents) > 0 {
onConnected = func(client *Client) error {
return out.loadTemplate(config.Template, client)
}
}

client := NewClient(
esURL, config.Index, proxyURL, tls,
config.Username, config.Password,
params)
params, onConnected)
return client, nil
}
}
@@ -72,7 +72,7 @@ func esConnect(t *testing.T, index string) *esConnection {

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil)
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil, nil)

// try to drop old index if left over from failed test
_, _, _ = client.Delete(index, "", "", nil) // ignore error
@@ -0,0 +1 @@
{"template": true}
@@ -0,0 +1 @@
{"template": true}
@@ -46,9 +46,13 @@ def test_config_test(self):
"""
shutil.copy("../../etc/libbeat.yml",
os.path.join(self.working_dir, "libbeat.yml"))
with open(self.working_dir + "/beatname.template.json", "w") as f:
f.write('{"template": true}')

exit_code = self.run_beat(config="libbeat.yml",
extra_args=["-configtest"])
exit_code = self.run_beat(
config="libbeat.yml",
extra_args=["-configtest",
"-path.config", self.working_dir])

assert exit_code == 0
assert self.log_contains("Config OK") is True
@@ -70,8 +74,8 @@ def test_version(self):

assert self.log_contains("error loading config file") is False

with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
proc = subprocess.Popen(args,
stdout=outputfile,
stderr=subprocess.STDOUT)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.