Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.6] Reduce memory usage of elasticsearch.index metricset (#16538) #17070

Merged
merged 3 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Revert changes in `docker` module: add size flag to docker.container. {pull}16600[16600]
- Convert increments of 100 nanoseconds/ticks to milliseconds for WriteTime and ReadTime in diskio metricset (Windows) for consistency. {issue}14233[14233]
- Fix diskio issue for windows 32 bit on disk_performance struct alignment. {issue}16680[16680]
- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538]

*Packetbeat*

Expand Down
210 changes: 114 additions & 96 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,96 +27,117 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var (
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
xpackSchema = s.Schema{
"uuid": c.Str("uuid"),
"primaries": c.Dict("primaries", indexStatsSchema),
"total": c.Dict("total", indexStatsSchema),
}
errParse = errors.New("failure parsing Indices Stats Elasticsearch API response")
)

indexStatsSchema = s.Schema{
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
}),
"fielddata": c.Dict("fielddata", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
}),
"indexing": c.Dict("indexing", s.Schema{
"index_total": c.Int("index_total"),
"index_time_in_millis": c.Int("index_time_in_millis"),
"throttle_time_in_millis": c.Int("throttle_time_in_millis"),
}),
"merges": c.Dict("merges", s.Schema{
"total_size_in_bytes": c.Int("total_size_in_bytes"),
}),
"query_cache": c.Dict("query_cache", cacheStatsSchema),
"request_cache": c.Dict("request_cache", cacheStatsSchema),
"search": c.Dict("search", s.Schema{
"query_total": c.Int("query_total"),
"query_time_in_millis": c.Int("query_time_in_millis"),
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory_in_bytes": c.Int("memory_in_bytes"),
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),
"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
}),
"store": c.Dict("store", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"refresh": c.Dict("refresh", s.Schema{
"external_total_time_in_millis": c.Int("external_total_time_in_millis", s.Optional),
"total_time_in_millis": c.Int("total_time_in_millis"),
}),
}
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
type stats struct {
Indices map[string]index `json:"indices"`
}

cacheStatsSchema = s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
"hit_count": c.Int("hit_count"),
"miss_count": c.Int("miss_count"),
}
)
type index struct {
UUID string `json:"uuid"`
Primaries indexStats `json:"primaries"`
Total indexStats `json:"total"`

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var indicesStruct IndicesStruct
if err := parseAPIResponse(content, &indicesStruct); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}
Index string `json:"index"`
Created int64 `json:"created"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
}

type indexStats struct {
Docs struct {
Count int `json:"count"`
} `json:"docs"`
FieldData struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
} `json:"fielddata"`
Indexing struct {
IndexTotal int `json:"index_total"`
IndexTimeInMillis int `json:"index_time_in_millis"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
} `json:"indexing"`
Merges struct {
TotalSizeInBytes int `json:"total_size_in_bytes"`
} `json:"merges"`
QueryCache cacheStats `json:"query_stats"`
RequestCache cacheStats `json:"request_cache"`
Search struct {
QueryTotal int `json:"query_total"`
QueryTimeInMillis int `json:"query_time_in_millis"`
} `json:"search"`
Segments struct {
Count int `json:"count"`
MemoryInBytes int `json:"memory_in_bytes"`
TermsMemoryInBytes int `json:"terms_memory_in_bytes"`
StoredFieldsMemoryInBytes int `json:"stored_fields_memory_in_bytes"`
TermVectorsMemoryInBytes int `json:"term_vectors_memory_in_bytes"`
NormsMemoryInBytes int `json:"norms_memory_in_bytes"`
PointsMemoryInBytes int `json:"points_memory_in_bytes"`
DocValuesMemoryInBytes int `json:"doc_values_memory_in_bytes"`
IndexWriterMemoryInBytes int `json:"index_writer_memory_in_bytes"`
VersionMapMemoryInBytes int `json:"version_map_memory_in_bytes"`
FixedBitSetMemoryInBytes int `json:"fixed_bit_set_memory_in_bytes"`
} `json:"segments"`
Store struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"store"`
Refresh struct {
ExternalTotalTimeInMillis int `json:"external_total_time_in_millis"`
TotalTimeInMillis int `json:"total_time_in_millis"`
} `json:"refresh"`
}

type cacheStats struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
HitCount int `json:"hit_count"`
MissCount int `json:"miss_count"`
}

type shardStats struct {
Total int `json:"total"`
Primaries int `json:"primaries"`
Replicas int `json:"replicas"`

ActiveTotal int `json:"active_total"`
ActivePrimaries int `json:"active_primaries"`
ActiveReplicas int `json:"active_replicas"`

UnassignedTotal int `json:"unassigned_total"`
UnassignedPrimaries int `json:"unassigned_primaries"`
UnassignedReplicas int `json:"unassigned_replicas"`

Initializing int `json:"initializing"`
Relocating int `json:"relocationg"`
}

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
return errors.Wrap(err, "failure retrieving cluster state from Elasticsearch")
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}

var errs multierror.Errors
for name, index := range indicesStruct.Indices {
for name, idx := range indicesStats.Indices {
event := mb.Event{}
indexStats, err := xpackSchema.Apply(index)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure applying index stats schema"))
continue
}
indexStats["index"] = name
idx.Index = name

err = addClusterStateFields(name, indexStats, clusterState)
err = addClusterStateFields(&idx, clusterState)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure adding cluster state fields"))
continue
Expand All @@ -127,7 +148,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "index_stats",
"index_stats": indexStats,
"index_stats": idx,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
Expand All @@ -137,19 +158,19 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
return errs.Err()
}

func parseAPIResponse(content []byte, indicesStruct *IndicesStruct) error {
return json.Unmarshal(content, indicesStruct)
func parseAPIResponse(content []byte, indicesStats *stats) error {
return json.Unmarshal(content, indicesStats)
}

// Fields added here are based on same fields being added by internal collection in
// https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L62-L124
func addClusterStateFields(indexName string, indexStats, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, indexName, "metadata")
func addClusterStateFields(idx *index, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, idx.Index, "metadata")
if err != nil {
return errors.Wrap(err, "failed to get index metadata from cluster state")
}

indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, indexName, "routing_table")
indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, idx.Index, "routing_table")
if err != nil {
return errors.Wrap(err, "failed to get index routing table from cluster state")
}
Expand All @@ -163,7 +184,7 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index creation time")
}
indexStats.Put("created", created)
idx.Created = created

// "index_stats.version.created", <--- don't think this is being used in the UI, so can we skip it?
// "index_stats.version.upgraded", <--- don't think this is being used in the UI, so can we skip it?
Expand All @@ -172,13 +193,13 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index status")
}
indexStats.Put("status", status)
idx.Status = status

shardStats, err := getIndexShardStats(shards)
if err != nil {
return errors.Wrap(err, "failed to get index shard stats")
}
indexStats.Put("shards", shardStats)
idx.Shards = *shardStats
return nil
}

Expand Down Expand Up @@ -241,7 +262,7 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
return "red", nil
}

func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
func getIndexShardStats(shards common.MapStr) (*shardStats, error) {
primaries := 0
replicas := 0

Expand Down Expand Up @@ -298,21 +319,18 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
}
}

return common.MapStr{
"total": primaries + replicas,
"primaries": primaries,
"replicas": replicas,

"active_total": activePrimaries + activeReplicas,
"active_primaries": activePrimaries,
"active_replicas": activeReplicas,

"unassigned_total": unassignedPrimaries + unassignedReplicas,
"unassigned_primaries": unassignedPrimaries,
"unassigned_replicas": unassignedReplicas,

"initializing": initializing,
"relocating": relocating,
return &shardStats{
Total: primaries + replicas,
Primaries: primaries,
Replicas: replicas,
ActiveTotal: activePrimaries + activeReplicas,
ActivePrimaries: activePrimaries,
ActiveReplicas: activeReplicas,
UnassignedTotal: unassignedPrimaries + unassignedReplicas,
UnassignedPrimaries: unassignedPrimaries,
UnassignedReplicas: unassignedReplicas,
Initializing: initializing,
Relocating: relocating,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/index/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func BenchmarkParseAPIResponse(b *testing.B) {
content, err := ioutil.ReadFile("_meta/test/stats.800.bench.json")
require.NoError(b, err)

var indicesStruct IndicesStruct
var indicesStats stats

for i := 0; i < b.N; i++ {
err = parseAPIResponse(content, &indicesStruct)
err = parseAPIResponse(content, &indicesStats)
require.NoError(b, err)
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

const (
statsMetrics = "docs,fielddata,indexing,merge,search,segments,store,refresh,query_cache,request_cache"
statsPath = "/_stats/" + statsMetrics
statsPath = "/_stats/" + statsMetrics + "?filter_path=indices"
)

// MetricSet type defines all fields of the MetricSet
Expand Down