Skip to content

Commit

Permalink
[Ingest Management] Agent expose metrics (elastic#22793)
Browse files Browse the repository at this point in the history
* [Ingest Manager] Log level reloadable from fleet (elastic#22690)

[Ingest Manager] Log level reloadable from fleet (elastic#22690)

* aa

* create drop

* updated drop

* process contains everything

* drop start time

* undo exposed endpoint

* sanitize dataset name

* ups

* agent expose http

* collect all metrics from beats

* colelct all from beats

* golint

* cleaner docs

* updated structure

* cgroup

* long live file saving issues
  • Loading branch information
michalpristas committed Dec 14, 2020
1 parent 4f4a553 commit 49c8d87
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 12 deletions.
5 changes: 3 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/cmd/instance/metrics"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/reload"
Expand Down Expand Up @@ -237,7 +238,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) {
Name: hostname,
Hostname: hostname,
ID: id,
EphemeralID: ephemeralID,
EphemeralID: metrics.EphemeralID(),
},
Fields: fields,
}
Expand Down Expand Up @@ -316,7 +317,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
reg = monitoring.Default.NewRegistry("libbeat")
}

err = setupMetrics(b.Info.Beat)
err = metrics.SetupMetrics(b.Info.Beat)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build darwin,cgo freebsd,cgo linux windows

package instance
package metrics

import (
"fmt"
Expand Down Expand Up @@ -46,7 +46,7 @@ func init() {
systemMetrics = monitoring.Default.NewRegistry("system")
}

func setupMetrics(name string) error {
func SetupMetrics(name string) error {
monitoring.NewFunc(systemMetrics, "cpu", reportSystemCPUUsage, monitoring.Report)

//if the beat name is longer than 15 characters, truncate it so we don't fail process checks later on
Expand Down Expand Up @@ -102,6 +102,7 @@ func reportMemStats(m monitoring.Mode, V monitoring.Visitor) {
monitoring.ReportInt(V, "memory_total", int64(stats.TotalAlloc))
if m == monitoring.Full {
monitoring.ReportInt(V, "memory_alloc", int64(stats.Alloc))
monitoring.ReportInt(V, "memory_sys", int64(stats.Sys))
monitoring.ReportInt(V, "gc_next", int64(stats.NextGC))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package instance
package metrics

import (
"time"
Expand Down Expand Up @@ -43,6 +43,11 @@ func init() {
}
}

// EphemeralID returns generated EphemeralID
func EphemeralID() uuid.UUID {
return ephemeralID
}

func reportInfo(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build linux freebsd,cgo

package instance
package metrics

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// +build !linux
// +build !freebsd !cgo

package instance
package metrics

// FDUsage is only supported on Linux and FreeBSD.
func setupLinuxBSDFDMetrics() {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build windows

package instance
package metrics

import (
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build !windows

package instance
package metrics

// Counting number of open handles is only supported on Windows.
func setupWindowsHandlesMetrics() {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
// +build !freebsd !cgo
// +build !linux,!windows

package instance
package metrics

import (
"github.com/elastic/beats/v7/libbeat/logp"
)

func setupMetrics(name string) error {
func SetupMetrics(name string) error {
logp.Warn("Metrics not implemented for this OS.")
return nil
}
99 changes: 98 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ package cmd

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"

"github.com/spf13/cobra"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/cmd/instance/metrics"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/service"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
Expand All @@ -28,6 +35,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/beats"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

Expand Down Expand Up @@ -133,6 +141,12 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se
return err
}

serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS())
if err != nil {
return err
}
defer serverStopFn()

if err := app.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -234,3 +248,86 @@ func defaultLogLevel(cfg *configuration.Configuration) string {

return defaultLogLevel
}

func setupMetrics(agentInfo *info.AgentInfo, logger *logger.Logger, operatingSystem string) (func() error, error) {
// use libbeat to setup metrics
if err := metrics.SetupMetrics(agentName); err != nil {
return nil, err
}

// start server for stats
endpointConfig := api.Config{
Enabled: true,
Host: beats.AgentMonitoringEndpoint(operatingSystem),
}

// create agent config path
createAgentMonitoringDrop(endpointConfig.Host)

cfg, err := common.NewConfigFrom(endpointConfig)
if err != nil {
return nil, err
}

s, err := exposeMetricsEndpoint(logger, cfg, monitoring.GetNamespace)
if err != nil {
return nil, errors.New(err, "could not start the HTTP server for the API")
}
s.Start()

// return server stopper
return s.Stop, nil
}

func createAgentMonitoringDrop(drop string) error {
if drop == "" || runtime.GOOS == "windows" {
return nil
}

path := strings.TrimPrefix(drop, "unix://")
if strings.HasSuffix(path, ".sock") {
path = filepath.Dir(path)
}

_, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
return err
}

// create
if err := os.MkdirAll(path, 0775); err != nil {
return err
}
}

return os.Chown(path, os.Geteuid(), os.Getegid())
}

func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(string) *monitoring.Namespace) (*api.Server, error) {
mux := http.NewServeMux()

makeAPIHandler := func(ns *monitoring.Namespace) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(
ns.GetRegistry(),
monitoring.Full,
false,
)

bytes, err := json.Marshal(data)
var content string
if err != nil {
content = fmt.Sprintf("Not valid json: %v", err)
} else {
content = string(bytes)
}
fmt.Fprintf(w, content)
}
}

mux.HandleFunc("/stats", makeAPIHandler(ns("stats")))
return api.New(log, mux, config)
}
Loading

0 comments on commit 49c8d87

Please sign in to comment.