Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,14 @@ func benchTypeCommandActionFactory(runner benchrunner.BenchRunner) cobraext.Comm
return fmt.Errorf("no %s benchmarks found", benchType)
}

esClient, err := elasticsearch.Client()
esClient, err := elasticsearch.NewClient()
if err != nil {
return errors.Wrap(err, "can't create Elasticsearch client")
}
err = esClient.CheckHealth(cmd.Context())
if err != nil {
return err
}

var results []*benchrunner.Result
for _, folder := range benchFolders {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func dumpInstalledObjectsCmdAction(cmd *cobra.Command, args []string) error {
if tlsSkipVerify {
clientOptions = append(clientOptions, elasticsearch.OptionWithSkipTLSVerify())
}
client, err := elasticsearch.Client(clientOptions...)
client, err := elasticsearch.NewClient(clientOptions...)
if err != nil {
return errors.Wrap(err, "failed to initialize Elasticsearch client")
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/testrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ func testTypeCommandActionFactory(runner testrunner.TestRunner) cobraext.Command

variantFlag, _ := cmd.Flags().GetString(cobraext.VariantFlagName)

esClient, err := elasticsearch.Client()
esClient, err := elasticsearch.NewClient()
if err != nil {
return errors.Wrap(err, "can't create Elasticsearch client")
}
err = esClient.CheckHealth(cmd.Context())
if err != nil {
return err
}

var results []testrunner.TestResult
for _, folder := range testFolders {
Expand Down
10 changes: 5 additions & 5 deletions internal/dump/installedobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type installedObjectsDumpSuite struct {
func (s *installedObjectsDumpSuite) SetupTest() {
_, err := os.Stat(s.DumpDir)
if errors.Is(err, os.ErrNotExist) {
client, err := elasticsearch.Client()
client, err := elasticsearch.NewClient()
s.Require().NoError(err)

dumper := NewInstalledObjectsDumper(client.API, s.PackageName)
Expand All @@ -77,10 +77,10 @@ func (s *installedObjectsDumpSuite) SetupTest() {
}

func (s *installedObjectsDumpSuite) TestDumpAll() {
client := estest.ElasticsearchClient(s.T(), s.RecordDir)
client := estest.NewClient(s.T(), s.RecordDir)

outputDir := s.T().TempDir()
dumper := NewInstalledObjectsDumper(client, s.PackageName)
dumper := NewInstalledObjectsDumper(client.API, s.PackageName)
n, err := dumper.DumpAll(context.Background(), outputDir)
s.Require().NoError(err)

Expand All @@ -94,8 +94,8 @@ func (s *installedObjectsDumpSuite) TestDumpAll() {
}

func (s *installedObjectsDumpSuite) TestDumpSome() {
client := estest.ElasticsearchClient(s.T(), s.RecordDir)
dumper := NewInstalledObjectsDumper(client, s.PackageName)
client := estest.NewClient(s.T(), s.RecordDir)
dumper := NewInstalledObjectsDumper(client.API, s.PackageName)

// In a map so order of execution is randomized.
dumpers := map[string]func(ctx context.Context, outputDir string) (int, error){
Expand Down
127 changes: 124 additions & 3 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
package elasticsearch

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/pkg/errors"

Expand All @@ -28,6 +32,9 @@ type IngestSimulateRequest = esapi.IngestSimulateRequest
// IngestGetPipelineRequest configures the Ingest Get Pipeline API request.
type IngestGetPipelineRequest = esapi.IngestGetPipelineRequest

// ClusterStateRequest configures the Cluster State API request.
type ClusterStateRequest = esapi.ClusterStateRequest

// clientOptions are used to configure a client.
type clientOptions struct {
address string
Expand Down Expand Up @@ -74,8 +81,13 @@ func OptionWithSkipTLSVerify() ClientOption {
}
}

// Client method creates new instance of the Elasticsearch client.
func Client(customOptions ...ClientOption) (*elasticsearch.Client, error) {
// Client is a wrapper over an Elasticsearch Client.
type Client struct {
*elasticsearch.Client
}

// NewClient method creates new instance of the Elasticsearch client.
func NewClient(customOptions ...ClientOption) (*Client, error) {
options := defaultOptionsFromEnv()
for _, option := range customOptions {
option(&options)
Expand Down Expand Up @@ -108,5 +120,114 @@ func Client(customOptions ...ClientOption) (*elasticsearch.Client, error) {
if err != nil {
return nil, errors.Wrap(err, "can't create instance")
}
return client, nil
return &Client{Client: client}, nil
}

// CheckHealth checks the health of the cluster.
func (client *Client) CheckHealth(ctx context.Context) error {
resp, err := client.Cluster.Health(client.Cluster.Health.WithContext(ctx))
if err != nil {
return errors.Wrap(err, "error checking cluster health")
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "error reading cluster health response")
}

var clusterHealth struct {
Status string `json:"status"`
}
err = json.Unmarshal(body, &clusterHealth)
if err != nil {
return errors.Wrap(err, "error decoding cluster health response")
}

if status := clusterHealth.Status; status != "green" && status != "yellow" {
if status != "red" {
return errors.Errorf("cluster in unhealthy state: %q", status)
}
cause, err := client.redHealthCause(ctx)
if err != nil {
return errors.Wrapf(err, "cluster in unhealthy state, failed to identify cause")
}
return errors.Errorf("cluster in unhealthy state: %s", cause)
}

return nil
}

// redHealthCause tries to identify the cause of a cluster in red state. This could be
// also used as a replacement of CheckHealth, but keeping them separated because it uses
// internal undocumented APIs that might change.
func (client *Client) redHealthCause(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/_internal/_health", nil)
if err != nil {
return "", errors.Wrap(err, "error creating internal health request")
}
resp, err := client.Transport.Perform(req)
if err != nil {
return "", errors.Wrap(err, "error performing internal health request")
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", errors.Wrap(err, "error reading internal health response")
}

var internalHealth struct {
Status string `json:"status"`
Indicators map[string]struct {
Status string `json:"status"`
Impacts []struct {
Severity int `json:"severity"`
} `json:"impacts"`
Diagnosis []struct {
Cause string `json:"cause"`
} `json:"diagnosis"`
} `json:"indicators"`
}
err = json.Unmarshal(body, &internalHealth)
if err != nil {
return "", errors.Wrap(err, "error decoding internal health response")
}
if internalHealth.Status != "red" {
return "", errors.New("cluster state is not red?")
}

// Only diagnostics with the highest severity impacts are returned.
var highestSeverity int
var causes []string
for _, indicator := range internalHealth.Indicators {
if indicator.Status != "red" {
continue
}

var severity int
for _, impact := range indicator.Impacts {
if impact.Severity > severity {
severity = impact.Severity
}
}

switch {
case severity < highestSeverity:
continue
case severity > highestSeverity:
highestSeverity = severity
causes = nil
case severity == highestSeverity:
// Continue appending for current severity.
}

for _, diagnosis := range indicator.Diagnosis {
causes = append(causes, diagnosis.Cause)
}
}
if len(causes) == 0 {
return "", errors.New("no causes found")
}
return strings.Join(causes, ", "), nil
}
42 changes: 38 additions & 4 deletions internal/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package elasticsearch
package elasticsearch_test

import (
"bytes"
"context"
"crypto/x509"
"encoding/pem"
"net/http"
Expand All @@ -16,6 +17,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/test"
)

func TestClientWithTLS(t *testing.T) {
Expand All @@ -26,30 +30,60 @@ func TestClientWithTLS(t *testing.T) {
caCertFile := writeCACertFile(t, server.Certificate())

t.Run("no TLS config, should fail", func(t *testing.T) {
client, err := Client(OptionWithAddress(server.URL))
client, err := elasticsearch.NewClient(elasticsearch.OptionWithAddress(server.URL))
require.NoError(t, err)

_, err = client.Ping()
assert.Error(t, err)
})

t.Run("with CA", func(t *testing.T) {
client, err := Client(OptionWithAddress(server.URL), OptionWithCertificateAuthority(caCertFile))
client, err := elasticsearch.NewClient(elasticsearch.OptionWithAddress(server.URL), elasticsearch.OptionWithCertificateAuthority(caCertFile))
require.NoError(t, err)

_, err = client.Ping()
assert.NoError(t, err)
})

t.Run("skip TLS verify", func(t *testing.T) {
client, err := Client(OptionWithAddress(server.URL), OptionWithSkipTLSVerify())
client, err := elasticsearch.NewClient(elasticsearch.OptionWithAddress(server.URL), elasticsearch.OptionWithSkipTLSVerify())
require.NoError(t, err)

_, err = client.Ping()
assert.NoError(t, err)
})
}

func TestClusterHealth(t *testing.T) {
cases := []struct {
RecordDir string
Expected string
}{
{
RecordDir: "./testdata/elasticsearch-8-5-healthy",
},
{
RecordDir: "./testdata/elasticsearch-8-5-red-out-of-disk",
Expected: "cluster in unhealthy state: 33 indices reside on nodes that have run or are likely to run out of disk space, this can temporarily disable writing on these indices.",
},
}

for _, c := range cases {
t.Run(c.RecordDir, func(t *testing.T) {
client := test.NewClient(t, c.RecordDir)

err := client.CheckHealth(context.Background())
if c.Expected != "" {
if assert.Error(t, err) {
assert.Equal(t, c.Expected, err.Error())
}
} else {
assert.NoError(t, err)
}
})
}
}

func writeCACertFile(t *testing.T, cert *x509.Certificate) string {
var d bytes.Buffer
err := pem.Encode(&d, &pem.Block{
Expand Down
10 changes: 5 additions & 5 deletions internal/elasticsearch/test/httptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import (
"github.com/elastic/elastic-package/internal/elasticsearch"
)

// ElasticsearchClient returns a client for a testing http server that uses prerecorded
// NewClient returns a client for a testing http server that uses prerecorded
// responses. If responses are not found, it forwards the query to the server started by
// elastic-package stack, and records the response.
// Responses are recorded in the directory indicated by serverDataDir.
func ElasticsearchClient(t *testing.T, serverDataDir string) *elasticsearch.API {
func NewClient(t *testing.T, serverDataDir string) *elasticsearch.Client {
server := testElasticsearchServer(t, serverDataDir)
t.Cleanup(func() { server.Close() })

client, err := elasticsearch.Client(
client, err := elasticsearch.NewClient(
elasticsearch.OptionWithAddress(server.URL),
)
require.NoError(t, err)

return client.API
return client
}

func testElasticsearchServer(t *testing.T, mockServerDir string) *httptest.Server {
Expand All @@ -56,7 +56,7 @@ func pathForURL(url string) string {
}

func recordRequest(t *testing.T, r *http.Request, path string) {
client, err := elasticsearch.Client()
client, err := elasticsearch.NewClient()
require.NoError(t, err)

t.Logf("Recording %s in %s", r.URL.Path, path)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cluster_name":"elasticsearch","status":"yellow","timed_out":false,"number_of_nodes":1,"number_of_data_nodes":1,"active_primary_shards":33,"active_shards":33,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":19,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":63.46153846153846}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name" : "6dcb6ee762ec",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "YhxaHz-aRrKl_rtySRVBoQ",
"version" : {
"number" : "8.5.0-SNAPSHOT",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "77b936e44234defdde3c7ded0d1ad9ae5e288f77",
"build_date" : "2022-10-29T04:11:27.132517622Z",
"build_snapshot" : true,
"lucene_version" : "9.4.1",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cluster_name":"elasticsearch","status":"red","timed_out":false,"number_of_nodes":1,"number_of_data_nodes":1,"active_primary_shards":33,"active_shards":33,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":20,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":62.264150943396224}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"status":"red","cluster_name":"elasticsearch","indicators":{"master_is_stable":{"status":"green","symptom":"The cluster has a stable master node","details":{"current_master":{"node_id":"PWBH3euxQn2wZwg0OgeCzQ","name":"008309953ac4"},"recent_masters":[{"node_id":"PWBH3euxQn2wZwg0OgeCzQ","name":"008309953ac4"}]}},"repository_integrity":{"status":"green","symptom":"No snapshot repositories configured."},"shards_availability":{"status":"red","symptom":"This cluster has 1 unavailable primary shard, 19 unavailable replica shards.","details":{"creating_primaries":0,"unassigned_replicas":19,"restarting_primaries":0,"restarting_replicas":0,"initializing_primaries":0,"started_replicas":0,"initializing_replicas":0,"unassigned_primaries":1,"started_primaries":33},"impacts":[{"id":"elasticsearch:health:shards_availability:impact:primary_unassigned","severity":1,"description":"Cannot add data to 1 index [.fleet-actions-7]. Searches might return incomplete results.","impact_areas":["ingest","search"]},{"id":"elasticsearch:health:shards_availability:impact:replica_unassigned","severity":2,"description":"Searches might be slower than usual. Fewer redundant copies of the data exist on 19 indices [.ds-logs-elastic_agent-default-2022.11.25-000001, .ds-logs-elastic_agent.filebeat-default-2022.11.25-000001, .ds-logs-elastic_agent.fleet_server-default-2022.11.25-000001, .ds-logs-elastic_agent.metricbeat-default-2022.11.25-000001, .ds-metrics-elastic_agent.elastic_agent-default-2022.11.25-000001, .ds-metrics-elastic_agent.filebeat-default-2022.11.25-000001, .ds-metrics-elastic_agent.fleet_server-default-2022.11.25-000001, .ds-metrics-elastic_agent.metricbeat-default-2022.11.25-000001, .ds-metrics-system.cpu-default-2022.11.25-000001, .ds-metrics-system.diskio-default-2022.11.25-000001, ...].","impact_areas":["search"]}],"diagnosis":[{"id":"elasticsearch:health:shards_availability:diagnosis:increase_tier_capacity_for_allocations:tier:data_hot","cause":"Elasticsearch isn't allowed to allocate some shards from these indices to any of the nodes in the desired data tier because there are not enough nodes in the [data_hot] tier to allocate each shard copy on a different node.","action":"Increase the number of nodes in this tier or decrease the number of replica shards in the affected indices.","affected_resources":[".ds-logs-elastic_agent-default-2022.11.25-000001",".ds-logs-elastic_agent.filebeat-default-2022.11.25-000001",".ds-logs-elastic_agent.fleet_server-default-2022.11.25-000001",".ds-logs-elastic_agent.metricbeat-default-2022.11.25-000001",".ds-metrics-elastic_agent.elastic_agent-default-2022.11.25-000001",".ds-metrics-elastic_agent.filebeat-default-2022.11.25-000001",".ds-metrics-elastic_agent.fleet_server-default-2022.11.25-000001",".ds-metrics-elastic_agent.metricbeat-default-2022.11.25-000001",".ds-metrics-system.cpu-default-2022.11.25-000001",".ds-metrics-system.diskio-default-2022.11.25-000001",".ds-metrics-system.filesystem-default-2022.11.25-000001",".ds-metrics-system.fsstat-default-2022.11.25-000001",".ds-metrics-system.load-default-2022.11.25-000001",".ds-metrics-system.memory-default-2022.11.25-000001",".ds-metrics-system.network-default-2022.11.25-000001",".ds-metrics-system.process-default-2022.11.25-000001",".ds-metrics-system.process.summary-default-2022.11.25-000001",".ds-metrics-system.socket_summary-default-2022.11.25-000001",".ds-metrics-system.uptime-default-2022.11.25-000001"],"help_url":"http://ela.st/tier-capacity"},{"id":"elasticsearch:health:shards_availability:diagnosis:explain_allocations","cause":"Elasticsearch isn't allowed to allocate some shards from these indices to any of the nodes in the cluster.","action":"Diagnose the issue by calling the allocation explain API for an index [GET _cluster/allocation/explain]. Choose a node to which you expect a shard to be allocated, find this node in the node-by-node explanation, and address the reasons which prevent Elasticsearch from allocating the shard.","affected_resources":[".fleet-actions-7"],"help_url":"http://ela.st/diagnose-shards"}]},"disk":{"status":"red","symptom":"33 indices are not allowed to be updated. 1 node is out of disk or running low on disk space.","details":{"indices_with_readonly_block":33,"nodes_with_enough_disk_space":0,"nodes_with_unknown_disk_status":0,"nodes_over_high_watermark":0,"nodes_over_flood_stage_watermark":1},"impacts":[{"id":"elasticsearch:health:disk:impact:ingest_capability_unavailable","severity":1,"description":"Cannot insert or update documents in the affected indices [.kibana_security_session_1, .security-7, .kibana_8.5.0_001, .kibana_task_manager_8.5.0_001, .apm-agent-configuration, .apm-custom-link, .ds-.logs-deprecation.elasticsearch-default-2022.11.25-000001, .ds-ilm-history-5-2022.11.25-000001, .ds-logs-elastic_agent-default-2022.11.25-000001, .ds-logs-elastic_agent.filebeat-default-2022.11.25-000001, ...].","impact_areas":["ingest"]},{"id":"elasticsearch:health:disk:impact:cluster_stability_at_risk","severity":1,"description":"Cluster stability might be impaired.","impact_areas":["deployment_management"]},{"id":"elasticsearch:health:disk:impact:cluster_functionality_unavailable","severity":3,"description":"The [ingest, ml, remote_cluster_client, transform] functionality might be impaired.","impact_areas":["deployment_management"]}],"diagnosis":[{"id":"elasticsearch:health:disk:diagnosis:add_disk_capacity_data_nodes","cause":"33 indices reside on nodes that have run or are likely to run out of disk space, this can temporarily disable writing on these indices.","action":"Enable autoscaling (if applicable), add disk capacity or free up disk space to resolve this. If you have already taken action please wait for the rebalancing to complete.","affected_resources":["PWBH3euxQn2wZwg0OgeCzQ"],"help_url":"https://ela.st/fix-data-disk"}]},"ilm":{"status":"green","symptom":"Index Lifecycle Management is running","details":{"policies":25,"ilm_status":"RUNNING"}},"slm":{"status":"green","symptom":"No Snapshot Lifecycle Management policies configured","details":{"slm_status":"RUNNING","policies":0}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name" : "008309953ac4",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "vYXuo7eQR-ikBlJfH3kQaQ",
"version" : {
"number" : "8.5.0-SNAPSHOT",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "77b936e44234defdde3c7ded0d1ad9ae5e288f77",
"build_date" : "2022-10-29T04:11:27.132517622Z",
"build_snapshot" : true,
"lucene_version" : "9.4.1",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}