Skip to content

Commit

Permalink
Adding xpack code for ES cluster stats metricset (#7810)
Browse files Browse the repository at this point in the history
This PR teaches the `elasticsearch/cluster_stats` metricset to query the appropriate Elasticsearch HTTP APIs and index `cluster_stats` documents into `.monitoring-es-6-mb-*` indices. These documents should be compatible in structure to `cluster_stats` documents in the current `.monitoring-es-6-*` indices indexed via the internal monitoring method.
  • Loading branch information
ycombinator authored and ruflin committed Aug 17, 2018
1 parent 7635731 commit 264e7b4
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 3 deletions.
8 changes: 7 additions & 1 deletion metricbeat/helper/elastic/elastic.go
Expand Up @@ -82,7 +82,13 @@ func MakeXPackMonitoringIndexName(product Product) string {
// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from a given product
func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) error {
err := fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
err := MakeErrorForMissingField(field, product)
r.Error(err)
return err
}

// MakeErrorForMissingField returns an error message for the given field being missing in an API
// response received from a given product
func MakeErrorForMissingField(field string, product Product) error {
return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
}
Expand Up @@ -63,7 +63,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch index recovery stats from a non master node.")
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
return
}

Expand All @@ -73,5 +73,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

eventMapping(r, content)
if m.MetricSet.XPack {
eventMappingXPack(r, m, content)
} else {
eventMapping(r, content)
}
}
239 changes: 239 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
@@ -0,0 +1,239 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cluster_stats

import (
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"strings"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
)

func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
// TLS does not need to be enabled if license type is something other than trial
value, err := license.GetValue("license.type")
if err != nil {
return false, elastic.MakeErrorForMissingField("license.type", elastic.Elasticsearch)
}

licenseType, ok := value.(string)
if !ok {
return false, fmt.Errorf("license type is not a string")
}

if licenseType != "trial" {
return false, nil
}

// TLS does not need to be enabled if security is not enabled
value, err = stackStats.GetValue("security.enabled")
if err != nil {
return false, elastic.MakeErrorForMissingField("security.enabled", elastic.Elasticsearch)
}

isSecurityEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("security enabled flag is not a boolean")
}

if !isSecurityEnabled {
return false, nil
}

// TLS does not need to be enabled if TLS is already enabled on the transport protocol
value, err = stackStats.GetValue("security.ssl.transport.enabled")
if err != nil {
return false, elastic.MakeErrorForMissingField("security.ssl.transport.enabled", elastic.Elasticsearch)
}

isTLSAlreadyEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("transport protocol SSL enabled flag is not a boolean")
}

return !isTLSAlreadyEnabled, nil
}

// computeNodesHash computes a simple hash value that can be used to determine if the nodes listing has changed since the last report.
func computeNodesHash(clusterState common.MapStr) (int32, error) {
value, err := clusterState.GetValue("nodes")
if err != nil {
return 0, elastic.MakeErrorForMissingField("nodes", elastic.Elasticsearch)
}

nodes, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("nodes is not a map")
}

var nodeEphemeralIDs []string
for _, value := range nodes {
nodeData, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("node data is not a map")
}

value, ok := nodeData["ephemeral_id"]
if !ok {
return 0, fmt.Errorf("node data does not contain ephemeral ID")
}

ephemeralID, ok := value.(string)
if !ok {
return 0, fmt.Errorf("node ephemeral ID is not a string")
}

nodeEphemeralIDs = append(nodeEphemeralIDs, ephemeralID)
}

sort.Strings(nodeEphemeralIDs)

combinedNodeEphemeralIDs := strings.Join(nodeEphemeralIDs, "")
return hash(combinedNodeEphemeralIDs), nil
}

func hash(s string) int32 {
h := fnv.New32()
h.Write([]byte(s))
return int32(h.Sum32()) // This cast is needed because the ES mapping is for a 32-bit *signed* integer
}

func apmIndicesExist(clusterState common.MapStr) (bool, error) {
value, err := clusterState.GetValue("routing_table.indices")
if err != nil {
return false, elastic.MakeErrorForMissingField("routing_table.indices", elastic.Elasticsearch)
}

indices, ok := value.(map[string]interface{})
if !ok {
return false, fmt.Errorf("routing table indices is not a map")
}

for name := range indices {
if strings.HasPrefix(name, "apm-") {
return true, nil
}
}

return false, nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

clusterStats := common.MapStr(data)

value, err := clusterStats.GetValue("cluster_name")
if err != nil {
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
}
clusterName, ok := value.(string)
if !ok {
return fmt.Errorf("cluster name is not a string")
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

if err = passthruField("status", clusterStats, clusterState); err != nil {
return err
}

nodesHash, err := computeNodesHash(clusterState)
if err != nil {
return err
}
clusterState.Put("nodes_hash", nodesHash)

usage, err := elasticsearch.GetStackUsage(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, usage)
if err != nil {
return err
}
license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol

isAPMFound, err := apmIndicesExist(clusterState)
if err != nil {
return err
}
delete(clusterState, "routing_table") // We don't want to index the routing table in monitoring indices

stackStats := map[string]interface{}{
"xpack": usage,
"apm": map[string]interface{}{
"found": isAPMFound,
},
}

event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
"cluster_name": clusterName,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "cluster_stats",
"license": license,
"version": info.Version.Number,
"cluster_stats": clusterStats,
"cluster_state": clusterState,
"stack_stats": stackStats,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

return nil
}
87 changes: 87 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Expand Up @@ -21,7 +21,10 @@ import (
"encoding/json"
"fmt"
"net/url"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
)

Expand All @@ -32,6 +35,9 @@ var clusterIDCache = map[string]string{}
type Info struct {
ClusterName string `json:"cluster_name"`
ClusterID string `json:"cluster_uuid"`
Version struct {
Number string `json:"number"`
} `json:"version"`
}

// NodeInfo struct cotains data about the node
Expand Down Expand Up @@ -167,3 +173,84 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error
}
return nil, fmt.Errorf("no node matched id %s", nodeID)
}

// GetLicense returns license information. Since we don't expect license information
// to change frequently, the information is cached for 1 minute to avoid
// hitting Elasticsearch frequently
func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
// First, check the cache
license := licenseCache.get()

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license")
if err != nil {
return nil, err
}

err = json.Unmarshal(content, &license)
if err != nil {
return nil, err
}

// Cache license for a minute
licenseCache.set(license, time.Minute)
}

return licenseCache.get(), nil
}

// GetClusterState returns cluster state information
func GetClusterState(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_cluster/state/version,master_node,nodes,routing_table")
if err != nil {
return nil, err
}

var clusterState map[string]interface{}
err = json.Unmarshal(content, &clusterState)
return clusterState, err
}

// GetStackUsage returns stack usage information
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
if err != nil {
return nil, err
}

var stackUsage map[string]interface{}
err = json.Unmarshal(content, &stackUsage)
return stackUsage, err
}

// Global cache for license information. Assumption is that license information changes infrequently
var licenseCache = &_licenseCache{}

type _licenseCache struct {
sync.RWMutex
license common.MapStr
cachedOn time.Time
ttl time.Duration
}

func (c *_licenseCache) get() common.MapStr {
c.Lock()
defer c.Unlock()

if time.Since(c.cachedOn) > c.ttl {
// We are past the TTL, so invalidate cache
c.license = nil
}

return c.license
}

func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) {
c.Lock()
defer c.Unlock()

c.license = license
c.ttl = ttl
c.cachedOn = time.Now()
}

0 comments on commit 264e7b4

Please sign in to comment.