From 8aeac783048de9de62feb01deb63010962f62904 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 11:11:00 -0400 Subject: [PATCH 1/9] Add protocl to control Elastic Agent. --- x-pack/elastic-agent/magefile.go | 2 +- x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/elastic-agent/magefile.go b/x-pack/elastic-agent/magefile.go index 638537f3d7c..6fd0558707b 100644 --- a/x-pack/elastic-agent/magefile.go +++ b/x-pack/elastic-agent/magefile.go @@ -331,7 +331,7 @@ func commitID() string { // Update is an alias for executing control protocol, configs, and specs. func Update() { - mg.SerialDeps(Config, BuildSpec, BuildFleetCfg) + mg.SerialDeps(ControlProto, Config, BuildSpec, BuildFleetCfg) } // CrossBuild cross-builds the beat for all target platforms. diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index 58df5e28f19..dff25d2a2bd 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -12,15 +12,14 @@ package proto import ( context "context" - reflect "reflect" - sync "sync" - proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( From c899dc862cf3934c8fbee8d28b77e2da0181e493 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 14:29:24 -0400 Subject: [PATCH 2/9] Fix CI with protoc. --- .travis.yml | 6 ++++++ x-pack/elastic-agent/Dockerfile | 2 ++ x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go | 5 +++-- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7809fe380b7..7dc91871af3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -265,6 +265,9 @@ jobs: # Agent - os: linux before_install: .ci/scripts/travis_has_changes.sh x-pack/elastic-agent libbeat || travis_terminate 0 + install: + - go get github.com/golang/protobuf/proto + - go get github.com/golang/protobuf/protoc-gen-go env: - PROJECT=x-pack/elastic-agent - MAGE='build test' @@ -272,6 +275,9 @@ jobs: stage: test - os: osx before_install: .ci/scripts/travis_has_changes.sh x-pack/elastic-agent libbeat || travis_terminate 0 + install: + - go get github.com/golang/protobuf/proto + - go get github.com/golang/protobuf/protoc-gen-go env: - PROJECT=x-pack/elastic-agent - MAGE='build unitTest' diff --git a/x-pack/elastic-agent/Dockerfile b/x-pack/elastic-agent/Dockerfile index 8636ff37944..214b56e7497 100644 --- a/x-pack/elastic-agent/Dockerfile +++ b/x-pack/elastic-agent/Dockerfile @@ -5,6 +5,8 @@ FROM circleci/golang:${GO_VERSION} ARG TEST_RESULTS=/tmp/test-results RUN mkdir -p ${TEST_RESULTS} && mkdir -p ./code +RUN go get github.com/golang/protobuf/proto +RUN go get github.com/golang/protobuf/protoc-gen-go RUN go get github.com/magefile/mage ENV GO111MODULE=on diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index dff25d2a2bd..58df5e28f19 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -12,14 +12,15 @@ package proto import ( context "context" + reflect "reflect" + sync "sync" + proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" ) const ( From 3874dc5168c4e6b96db9f9597baa9ad0d333edc4 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 15:44:05 -0400 Subject: [PATCH 3/9] Remove CI changes. --- .travis.yml | 6 ------ x-pack/elastic-agent/Dockerfile | 2 -- x-pack/elastic-agent/magefile.go | 2 +- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7dc91871af3..7809fe380b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -265,9 +265,6 @@ jobs: # Agent - os: linux before_install: .ci/scripts/travis_has_changes.sh x-pack/elastic-agent libbeat || travis_terminate 0 - install: - - go get github.com/golang/protobuf/proto - - go get github.com/golang/protobuf/protoc-gen-go env: - PROJECT=x-pack/elastic-agent - MAGE='build test' @@ -275,9 +272,6 @@ jobs: stage: test - os: osx before_install: .ci/scripts/travis_has_changes.sh x-pack/elastic-agent libbeat || travis_terminate 0 - install: - - go get github.com/golang/protobuf/proto - - go get github.com/golang/protobuf/protoc-gen-go env: - PROJECT=x-pack/elastic-agent - MAGE='build unitTest' diff --git a/x-pack/elastic-agent/Dockerfile b/x-pack/elastic-agent/Dockerfile index 214b56e7497..8636ff37944 100644 --- a/x-pack/elastic-agent/Dockerfile +++ b/x-pack/elastic-agent/Dockerfile @@ -5,8 +5,6 @@ FROM circleci/golang:${GO_VERSION} ARG TEST_RESULTS=/tmp/test-results RUN mkdir -p ${TEST_RESULTS} && mkdir -p ./code -RUN go get github.com/golang/protobuf/proto -RUN go get github.com/golang/protobuf/protoc-gen-go RUN go get github.com/magefile/mage ENV GO111MODULE=on diff --git a/x-pack/elastic-agent/magefile.go b/x-pack/elastic-agent/magefile.go index 6fd0558707b..638537f3d7c 100644 --- a/x-pack/elastic-agent/magefile.go +++ b/x-pack/elastic-agent/magefile.go @@ -331,7 +331,7 @@ func commitID() string { // Update is an alias for executing control protocol, configs, and specs. func Update() { - mg.SerialDeps(ControlProto, Config, BuildSpec, BuildFleetCfg) + mg.SerialDeps(Config, BuildSpec, BuildFleetCfg) } // CrossBuild cross-builds the beat for all target platforms. From cd4d43f46130e032f1e0e44a9fa6d2b1ac858c6c Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 14:03:52 -0400 Subject: [PATCH 4/9] Start on the control server code. --- .../elastic-agent/pkg/agent/control/common.go | 19 +++++ .../pkg/agent/control/common_windows.go | 20 +++++ .../pkg/agent/control/server/listener.go | 17 ++++ .../agent/control/server/listener_windows.go | 32 ++++++++ .../pkg/agent/control/server/server.go | 78 +++++++++++++++++++ 5 files changed, 166 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/agent/control/common.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/common_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/server.go diff --git a/x-pack/elastic-agent/pkg/agent/control/common.go b/x-pack/elastic-agent/pkg/agent/control/common.go new file mode 100644 index 00000000000..d2bbfc50d3e --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/common.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package control + +import ( + "fmt" + "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +func Address() string { + data := paths.Data() + return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/common_windows.go b/x-pack/elastic-agent/pkg/agent/control/common_windows.go new file mode 100644 index 00000000000..6c087fecea9 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/common_windows.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package control + +import ( + "crypto/sha256" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +// Address control address for both server and client. +func Address() string { + data = paths.Data() + return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go new file mode 100644 index 00000000000..3dfa44fd10d --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package server + +import ( + "net" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func createListener() (net.Listener, error) { + return net.Listen("unix", control.Address()) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go new file mode 100644 index 00000000000..c326e166475 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package server + +import ( + "crypto/sha256" + "fmt" + "net" + "os/user" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +// createListener creates a named pipe listener on Windows +func createListener() (net.Listener, error) { + u, err := user.Current() + if err != nil { + return nil, err + } + sd, err := npipe.DefaultSD(u.Username) + if err != nil { + return nil, err + } + return npipe.NewListener(control.Address(), sd) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go new file mode 100644 index 00000000000..0f5aa9e862e --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" +) + +type Server struct { + logger *logger.Logger + listener net.Listener + server *grpc.Server +} + +func New(log *logger.Logger) *Server { + return &Server{ + logger: log, + } +} + +// Start starts the GRPC endpoint and accepts new connections. +func (s *Server) Start() error { + if s.server != nil { + // already started + return nil + } + + lis, err := createListener() + if err != nil { + return err + } + s.listener = lis + s.server = grpc.NewServer() + proto.RegisterElasticAgentServer(s.server, s) + + // start serving GRPC connections + go func() { + err := s.server.Serve(lis) + if err != nil { + s.logger.Errorf("error listening for GRPC: %s", err) + } + }() + + return nil +} + +// Stop stops the GRPC endpoint. +func (s *Server) Stop() { + if s.server != nil { + s.server.Stop() + s.server = nil + s.listener = nil + } +} + +func (s *Server) Version(ctx context.Context, empty *proto.Empty) (*proto.VersionResponse, error) { + panic("implement me") +} + +func (s *Server) Status(ctx context.Context, empty *proto.Empty) (*proto.StatusResponse, error) { + panic("implement me") +} + +func (s *Server) Restart(ctx context.Context, empty *proto.Empty) (*proto.Empty, error) { + panic("implement me") +} + +func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { + panic("implement me") +} From c746f6001a341077f2c082040f4730ebb77a8dae Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 15:15:33 -0400 Subject: [PATCH 5/9] More client/server work. --- .../pkg/agent/control/{common.go => addr.go} | 0 .../{common_windows.go => addr_windows.go} | 0 .../pkg/agent/control/client/client.go | 177 ++++++++++++++++++ .../pkg/agent/control/client/dial.go | 18 ++ .../pkg/agent/control/server/server.go | 35 +++- .../elastic-agent/pkg/agent/control/time.go | 10 + 6 files changed, 233 insertions(+), 7 deletions(-) rename x-pack/elastic-agent/pkg/agent/control/{common.go => addr.go} (100%) rename x-pack/elastic-agent/pkg/agent/control/{common_windows.go => addr_windows.go} (100%) create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/client.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/time.go diff --git a/x-pack/elastic-agent/pkg/agent/control/common.go b/x-pack/elastic-agent/pkg/agent/control/addr.go similarity index 100% rename from x-pack/elastic-agent/pkg/agent/control/common.go rename to x-pack/elastic-agent/pkg/agent/control/addr.go diff --git a/x-pack/elastic-agent/pkg/agent/control/common_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go similarity index 100% rename from x-pack/elastic-agent/pkg/agent/control/common_windows.go rename to x-pack/elastic-agent/pkg/agent/control/addr_windows.go diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go new file mode 100644 index 00000000000..54fa32715fd --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -0,0 +1,177 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package client + +import ( + "context" + "encoding/json" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "sync" + "time" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" +) + +// Status is the status of the Elastic Agent +type Status = proto.Status + +const ( + // Starting is when the it is still starting. + Starting Status = proto.Status_STARTING + // Configuring is when it is configuring. + Configuring Status = proto.Status_CONFIGURING + // Healthy is when it is healthy. + Healthy Status = proto.Status_HEALTHY + // Degraded is when it is degraded. + Degraded Status = proto.Status_DEGRADED + // Failed is when it is failed. + Failed Status = proto.Status_FAILED + // Stopping is when it is stopping. + Stopping Status = proto.Status_STOPPING + // Upgrading is when it is upgrading. + Upgrading Status = proto.Status_UPGRADING +) + +// Version is the current running version of the daemon. +type Version struct { + Version string + Commit string + BuildTime time.Time + Snapshot bool +} + +// ApplicationStatus is a status of an application inside of Elastic Agent. +type ApplicationStatus struct { + ID string + Name string + Status Status + Message string + Payload map[string]interface{} +} + +// AgentStatus is the current status of the Elastic Agent. +type AgentStatus struct { + Status Status + Message string + Applications []*ApplicationStatus +} + +// Client communicates to Elastic Agent through the control protocol. +type Client interface { + // Start starts the client. + Start(ctx context.Context) error + // Stop stops the client. + Stop() + // Version returns the current version of the running agent. + Version(ctx context.Context) (Version, error) + // Status returns the current status of the running agent. + Status(ctx context.Context) (*AgentStatus, error) + // Restart triggers restarting the current running daemon. + Restart(ctx context.Context) error + // Upgrade triggers upgrade of the current running daemon. + Upgrade(ctx context.Context, version string, sourceURI string) (string, error) +} + +// client manages the state and communication to the Elastic Agent. +type client struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + client proto.ElasticAgentClient + cfgLock sync.RWMutex + obsLock sync.RWMutex +} + +// New creates a client connection to Elastic Agent. +func New() Client { + return &client{} +} + +// Start starts the connection to Elastic Agent. +func (c *client) Start(ctx context.Context) error { + c.ctx, c.cancel = context.WithCancel(ctx) + conn, err := dialContext(ctx) + if err != nil { + return err + } + c.client = proto.NewElasticAgentClient(conn) + return nil +} + +// Stop stops the connection to Elastic Agent. +func (c *client) Stop() { + if c.cancel != nil { + c.cancel() + c.wg.Wait() + c.ctx = nil + c.cancel = nil + } +} + +// Version returns the current version of the running agent. +func (c *client) Version(ctx context.Context) (Version, error) { + res, err := c.client.Version(ctx, &proto.Empty{}) + if err != nil { + return Version{}, err + } + bt, err := time.Parse(control.TimeFormat(), res.BuildTime) + if err != nil { + return Version{}, err + } + return Version{ + Version: res.Version, + Commit: res.Commit, + BuildTime: bt, + Snapshot: res.Snapshot, + }, nil +} + +// Status returns the current status of the running agent. +func (c *client) Status(ctx context.Context) (*AgentStatus, error) { + res, err := c.client.Status(ctx, &proto.Empty{}) + if err != nil { + return nil, err + } + s := &AgentStatus{ + Status: res.Status, + Message: res.Message, + Applications: make([]*ApplicationStatus, 0), + } + for _, appRes := range res.Applications { + var payload map[string]interface{} + if appRes.Payload != "" { + err := json.Unmarshal([]byte(appRes.Payload), &payload) + if err != nil { + return nil, err + } + } + s.Applications = append(s.Applications, &ApplicationStatus{ + ID: appRes.Id, + Name: appRes.Name, + Status: appRes.Status, + Message: appRes.Message, + Payload: payload, + }) + } + return s, nil +} + +// Restart triggers restarting the current running daemon. +func (c *client) Restart(ctx context.Context) error { + _, err := c.client.Restart(ctx, &proto.Empty{}) + return err +} + +// Upgrade triggers upgrade of the current running daemon. +func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) { + res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{ + Version: version, + SourceURI: sourceURI, + }) + if err != nil { + return "", err + } + return res.Version, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial.go b/x-pack/elastic-agent/pkg/agent/control/client/dial.go new file mode 100644 index 00000000000..c2300c9d55b --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial.go @@ -0,0 +1,18 @@ +package client + +import ( + "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "net" + + "google.golang.org/grpc" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", addr) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 0f5aa9e862e..312cf430ea9 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -6,6 +6,8 @@ package server import ( "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" "net" "google.golang.org/grpc" @@ -61,18 +63,37 @@ func (s *Server) Stop() { } } -func (s *Server) Version(ctx context.Context, empty *proto.Empty) (*proto.VersionResponse, error) { - panic("implement me") +// Version returns the currently running version. +func (s *Server) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { + return &proto.VersionResponse{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime().Format(control.TimeFormat()), + Snapshot: release.Snapshot(), + }, nil } -func (s *Server) Status(ctx context.Context, empty *proto.Empty) (*proto.StatusResponse, error) { - panic("implement me") +// Status returns the overall status of the agent. +func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusResponse, error) { + // not implemented + return &proto.StatusResponse{ + Status: proto.Status_HEALTHY, + Message: "not implemented", + Applications: nil, + }, nil } -func (s *Server) Restart(ctx context.Context, empty *proto.Empty) (*proto.Empty, error) { - panic("implement me") +// Restart performs re-exec. +func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.Empty, error) { + // not implemented + return &proto.Empty{}, nil } +// Upgrade performs the upgrade operation. func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { - panic("implement me") + // not implemented + return &proto.UpgradeResponse{ + Status: proto.UpgradeResponse_FAILURE, + Version: release.Version(), + }, nil } diff --git a/x-pack/elastic-agent/pkg/agent/control/time.go b/x-pack/elastic-agent/pkg/agent/control/time.go new file mode 100644 index 00000000000..c87902bbc37 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/time.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control + +// TimeFormat returns the time format shared between the protocol. +func TimeFormat() string { + return "2006-01-02 15:04:05 -0700 MST" +} From 8ed86c1962f1e9c5591164299fea563468cded20 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 15:22:56 -0400 Subject: [PATCH 6/9] More work. --- .../pkg/agent/control/client/client.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index 54fa32715fd..6af2f27b13c 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -7,6 +7,7 @@ package client import ( "context" "encoding/json" + "fmt" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "sync" "time" @@ -160,8 +161,14 @@ func (c *client) Status(ctx context.Context) (*AgentStatus, error) { // Restart triggers restarting the current running daemon. func (c *client) Restart(ctx context.Context) error { - _, err := c.client.Restart(ctx, &proto.Empty{}) - return err + res, err := c.client.Restart(ctx, &proto.Empty{}) + if err != nil { + return err + } + if res.Status == proto.ActionStatus_FAILURE { + return fmt.Errorf(res.Error) + } + return nil } // Upgrade triggers upgrade of the current running daemon. @@ -173,5 +180,8 @@ func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) if err != nil { return "", err } + if res.Status == proto.ActionStatus_FAILURE { + return "", fmt.Errorf(res.Error) + } return res.Version, nil } From 7b21e09cddeef98cb25d9b0a7eefd5b29d893f20 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 15:40:32 -0400 Subject: [PATCH 7/9] Add test. --- .../pkg/agent/control/client/client.go | 25 ++++----- .../pkg/agent/control/client/dial.go | 12 ++++- .../pkg/agent/control/client/dial_windows.go | 26 +++++++++ .../pkg/agent/control/control_test.go | 53 +++++++++++++++++++ .../pkg/agent/control/server/listener.go | 3 +- .../agent/control/server/listener_windows.go | 3 -- .../pkg/agent/control/server/server.go | 23 ++++---- 7 files changed, 118 insertions(+), 27 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/control_test.go diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index 6af2f27b13c..5ab7b2d95e3 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -8,10 +8,11 @@ import ( "context" "encoding/json" "fmt" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "sync" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" ) @@ -37,25 +38,25 @@ const ( // Version is the current running version of the daemon. type Version struct { - Version string - Commit string + Version string + Commit string BuildTime time.Time - Snapshot bool + Snapshot bool } // ApplicationStatus is a status of an application inside of Elastic Agent. type ApplicationStatus struct { - ID string - Name string - Status Status + ID string + Name string + Status Status Message string Payload map[string]interface{} } // AgentStatus is the current status of the Elastic Agent. type AgentStatus struct { - Status Status - Message string + Status Status + Message string Applications []*ApplicationStatus } @@ -149,9 +150,9 @@ func (c *client) Status(ctx context.Context) (*AgentStatus, error) { } } s.Applications = append(s.Applications, &ApplicationStatus{ - ID: appRes.Id, - Name: appRes.Name, - Status: appRes.Status, + ID: appRes.Id, + Name: appRes.Name, + Status: appRes.Status, Message: appRes.Message, Payload: payload, }) diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial.go b/x-pack/elastic-agent/pkg/agent/control/client/dial.go index c2300c9d55b..56313b12c82 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/dial.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial.go @@ -1,15 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + package client import ( "context" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "net" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "google.golang.org/grpc" ) func dialContext(ctx context.Context) (*grpc.ClientConn, error) { - return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) + return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) } func dialer(ctx context.Context, addr string) (net.Conn, error) { diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go new file mode 100644 index 00000000000..58b36c18043 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package client + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + return npipe.DialContext(arr)(ctx, "", "") +} diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go new file mode 100644 index 00000000000..13d32420258 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -0,0 +1,53 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" +) + +func TestServerClient_Version(t *testing.T) { + srv := server.New(newErrorLogger(t)) + err := srv.Start() + require.NoError(t, err) + defer srv.Stop() + + c := client.New() + err = c.Start(context.Background()) + require.NoError(t, err) + defer c.Stop() + + ver, err := c.Version(context.Background()) + require.NoError(t, err) + + assert.Equal(t, client.Version{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime(), + Snapshot: release.Snapshot(), + }, ver) +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go index 3dfa44fd10d..c2283c722c0 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -8,10 +8,11 @@ package server import ( "net" + "strings" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" ) func createListener() (net.Listener, error) { - return net.Listen("unix", control.Address()) + return net.Listen("unix", strings.TrimPrefix(control.Address(), "unix://")) } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go index c326e166475..d2d2866b98a 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -7,14 +7,11 @@ package server import ( - "crypto/sha256" - "fmt" "net" "os/user" "github.com/elastic/beats/v7/libbeat/api/npipe" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" ) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 312cf430ea9..f6b2b1d77ac 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -6,20 +6,21 @@ package server import ( "context" + "net" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" - "net" "google.golang.org/grpc" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) type Server struct { - logger *logger.Logger - listener net.Listener - server *grpc.Server + logger *logger.Logger + listener net.Listener + server *grpc.Server } func New(log *logger.Logger) *Server { @@ -84,16 +85,20 @@ func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusRespons } // Restart performs re-exec. -func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.Empty, error) { +func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { // not implemented - return &proto.Empty{}, nil + return &proto.RestartResponse{ + Status: proto.ActionStatus_FAILURE, + Error: "not implemented", + }, nil } // Upgrade performs the upgrade operation. func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { // not implemented return &proto.UpgradeResponse{ - Status: proto.UpgradeResponse_FAILURE, - Version: release.Version(), + Status: proto.ActionStatus_FAILURE, + Version: "", + Error: "not implemented", }, nil } From 2f34799b8862e415150eeadb2ef3d64ff9293ae0 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 22 Jul 2020 15:50:00 -0400 Subject: [PATCH 8/9] Fix vet issues. --- x-pack/elastic-agent/pkg/agent/control/addr.go | 1 + x-pack/elastic-agent/pkg/agent/control/addr_windows.go | 2 +- x-pack/elastic-agent/pkg/agent/control/server/server.go | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr.go b/x-pack/elastic-agent/pkg/agent/control/addr.go index d2bbfc50d3e..3416480a6a0 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) +// Address returns the address to connect to Elastic Agent daemon. func Address() string { data := paths.Data() return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index 6c087fecea9..f4bc2e9981a 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) -// Address control address for both server and client. +// Address returns the address to connect to Elastic Agent daemon. func Address() string { data = paths.Data() return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index f6b2b1d77ac..c9a750808fc 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -17,12 +17,14 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) +// Server is the daemon side of the control protocol. type Server struct { logger *logger.Logger listener net.Listener server *grpc.Server } +// New creates a new control protocol server. func New(log *logger.Logger) *Server { return &Server{ logger: log, From 2928141dfd95c6572fc202c24518617efca35aa6 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 10:43:03 -0400 Subject: [PATCH 9/9] Fix permissions on unix socket. Add comment to Windows npipe. --- .../pkg/agent/control/addr_windows.go | 2 ++ .../pkg/agent/control/client/client.go | 8 +++---- .../pkg/agent/control/server/listener.go | 22 ++++++++++++++++++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index f4bc2e9981a..1123eec941b 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -16,5 +16,7 @@ import ( // Address returns the address to connect to Elastic Agent daemon. func Address() string { data = paths.Data() + // entire string cannot be longer than 256 characters, this forces the + // length to always be 87 characters (but unique per data path) return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) } diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index 5ab7b2d95e3..bcd8eccdb82 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -139,9 +139,9 @@ func (c *client) Status(ctx context.Context) (*AgentStatus, error) { s := &AgentStatus{ Status: res.Status, Message: res.Message, - Applications: make([]*ApplicationStatus, 0), + Applications: make([]*ApplicationStatus, len(res.Applications)), } - for _, appRes := range res.Applications { + for i, appRes := range res.Applications { var payload map[string]interface{} if appRes.Payload != "" { err := json.Unmarshal([]byte(appRes.Payload), &payload) @@ -149,13 +149,13 @@ func (c *client) Status(ctx context.Context) (*AgentStatus, error) { return nil, err } } - s.Applications = append(s.Applications, &ApplicationStatus{ + s.Applications[i] = &ApplicationStatus{ ID: appRes.Id, Name: appRes.Name, Status: appRes.Status, Message: appRes.Message, Payload: payload, - }) + } } return s, nil } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go index c2283c722c0..2dd5d54a46f 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -8,11 +8,31 @@ package server import ( "net" + "os" + "path/filepath" "strings" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" ) func createListener() (net.Listener, error) { - return net.Listen("unix", strings.TrimPrefix(control.Address(), "unix://")) + path := strings.TrimPrefix(control.Address(), "unix://") + dir := filepath.Dir(path) + if _, err := os.Stat(dir); os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + if err != nil { + return nil, err + } + } + lis, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + err = os.Chmod(path, 0700) + if err != nil { + // failed to set permissions (close listener) + lis.Close() + return nil, err + } + return lis, err }