Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Add skeleton for client/server for agent control protocol #20163

Merged
merged 9 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package control

import (
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data := paths.Data()
return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock"))
}
22 changes: 22 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package control

import (
"crypto/sha256"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
)

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data = paths.Data()
// entire string cannot be longer than 256 characters, this forces the
// length to always be 87 characters (but unique per data path)
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

careful with the length here, i think it should be fine with 64b for sha256 but maybe a comment/reminder in case somebody wants to change format would be handy

}
188 changes: 188 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package client

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto"
)

// Status is the status of the Elastic Agent
type Status = proto.Status

const (
// Starting is when the it is still starting.
Starting Status = proto.Status_STARTING
// Configuring is when it is configuring.
Configuring Status = proto.Status_CONFIGURING
// Healthy is when it is healthy.
Healthy Status = proto.Status_HEALTHY
// Degraded is when it is degraded.
Degraded Status = proto.Status_DEGRADED
// Failed is when it is failed.
Failed Status = proto.Status_FAILED
// Stopping is when it is stopping.
Stopping Status = proto.Status_STOPPING
// Upgrading is when it is upgrading.
Upgrading Status = proto.Status_UPGRADING
)

// Version is the current running version of the daemon.
type Version struct {
Version string
Commit string
BuildTime time.Time
Snapshot bool
}

// ApplicationStatus is a status of an application inside of Elastic Agent.
type ApplicationStatus struct {
ID string
Name string
Status Status
Message string
Payload map[string]interface{}
}

// AgentStatus is the current status of the Elastic Agent.
type AgentStatus struct {
Status Status
Message string
Applications []*ApplicationStatus
}

// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Start starts the client.
Start(ctx context.Context) error
// Stop stops the client.
Stop()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// Status returns the current status of the running agent.
Status(ctx context.Context) (*AgentStatus, error)
// Restart triggers restarting the current running daemon.
Restart(ctx context.Context) error
// Upgrade triggers upgrade of the current running daemon.
Upgrade(ctx context.Context, version string, sourceURI string) (string, error)
}

// client manages the state and communication to the Elastic Agent.
type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client proto.ElasticAgentClient
cfgLock sync.RWMutex
obsLock sync.RWMutex
}

// New creates a client connection to Elastic Agent.
func New() Client {
return &client{}
}

// Start starts the connection to Elastic Agent.
func (c *client) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
if err != nil {
return err
}
c.client = proto.NewElasticAgentClient(conn)
return nil
}

// Stop stops the connection to Elastic Agent.
func (c *client) Stop() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
c.ctx = nil
c.cancel = nil
}
}

// Version returns the current version of the running agent.
func (c *client) Version(ctx context.Context) (Version, error) {
res, err := c.client.Version(ctx, &proto.Empty{})
if err != nil {
return Version{}, err
}
bt, err := time.Parse(control.TimeFormat(), res.BuildTime)
if err != nil {
return Version{}, err
}
return Version{
Version: res.Version,
Commit: res.Commit,
BuildTime: bt,
Snapshot: res.Snapshot,
}, nil
}

// Status returns the current status of the running agent.
func (c *client) Status(ctx context.Context) (*AgentStatus, error) {
res, err := c.client.Status(ctx, &proto.Empty{})
if err != nil {
return nil, err
}
s := &AgentStatus{
Status: res.Status,
Message: res.Message,
Applications: make([]*ApplicationStatus, len(res.Applications)),
}
for i, appRes := range res.Applications {
var payload map[string]interface{}
if appRes.Payload != "" {
err := json.Unmarshal([]byte(appRes.Payload), &payload)
if err != nil {
return nil, err
}
}
s.Applications[i] = &ApplicationStatus{
ID: appRes.Id,
Name: appRes.Name,
Status: appRes.Status,
Message: appRes.Message,
Payload: payload,
}
}
return s, nil
}

// Restart triggers restarting the current running daemon.
func (c *client) Restart(ctx context.Context) error {
res, err := c.client.Restart(ctx, &proto.Empty{})
if err != nil {
return err
}
if res.Status == proto.ActionStatus_FAILURE {
return fmt.Errorf(res.Error)
}
return nil
}

// Upgrade triggers upgrade of the current running daemon.
func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) {
res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{
Version: version,
SourceURI: sourceURI,
})
if err != nil {
return "", err
}
if res.Status == proto.ActionStatus_FAILURE {
return "", fmt.Errorf(res.Error)
}
return res.Version, nil
}
26 changes: 26 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package client

import (
"context"
"net"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"google.golang.org/grpc"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "unix", addr)
}
26 changes: 26 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package client

import (
"context"
"net"

"google.golang.org/grpc"

"github.com/elastic/beats/v7/libbeat/api/npipe"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
return npipe.DialContext(arr)(ctx, "", "")
}
53 changes: 53 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package control_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t))
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()

c := client.New()
err = c.Start(context.Background())
require.NoError(t, err)
defer c.Stop()

ver, err := c.Version(context.Background())
require.NoError(t, err)

assert.Equal(t, client.Version{
Version: release.Version(),
Commit: release.Commit(),
BuildTime: release.BuildTime(),
Snapshot: release.Snapshot(),
}, ver)
}

func newErrorLogger(t *testing.T) *logger.Logger {
t.Helper()

loggerCfg := logger.DefaultLoggingConfig()
loggerCfg.Level = logp.ErrorLevel

log, err := logger.NewFromConfig("", loggerCfg)
require.NoError(t, err)
return log
}
38 changes: 38 additions & 0 deletions x-pack/elastic-agent/pkg/agent/control/server/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package server

import (
"net"
"os"
"path/filepath"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"
)

func createListener() (net.Listener, error) {
path := strings.TrimPrefix(control.Address(), "unix://")
dir := filepath.Dir(path)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
}
lis, err := net.Listen("unix", path)
if err != nil {
return nil, err
}
err = os.Chmod(path, 0700)
if err != nil {
// failed to set permissions (close listener)
lis.Close()
return nil, err
}
return lis, err
}