Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Feb 7, 2023
1 parent c43cc82 commit 7cbf7a7
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 402 deletions.
3 changes: 2 additions & 1 deletion dev-tools/v2tool/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"os/exec"
"time"

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/pkg/core/process"
"gopkg.in/yaml.v2"
)

// InputManager carries all the logic needed run a V2 client,
Expand Down
8 changes: 0 additions & 8 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,21 +294,13 @@ message CheckinAgentInfo {
bool snapshot = 3;
}

message Feature {
// The original unparsed configuration
google.protobuf.Struct source = 1;
// Pull out the enabled flag for convenience since it is always necessary.
bool enabled = 2;
}

// A set of units and their expected states and configuration.
message CheckinExpected {
// Units is the expected units the component should be running.
repeated UnitExpected units = 1;
// Agent info is provided only on first CheckinExpected response to the component.
CheckinAgentInfo agent_info = 2;

map<string, Feature> features = 3;
}

// Observed status for a unit.
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
18 changes: 15 additions & 3 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (c *clientV2) sendObserved(client proto.ElasticAgent_CheckinV2Client) error

// syncUnits syncs the expected units with the current state.
func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
// here!!! send hapens here
// here!!! send happens here
c.unitsMu.Lock()
defer c.unitsMu.Unlock()
i := 0
Expand All @@ -379,15 +379,27 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
unit := c.findUnit(agentUnit.Id, UnitType(agentUnit.Type))
if unit == nil {
// new unit
unit = newUnit(agentUnit.Id, UnitType(agentUnit.Type), UnitState(agentUnit.State), UnitLogLevel(agentUnit.LogLevel), agentUnit.Config, agentUnit.ConfigStateIdx, c)
unit = newUnit(
agentUnit.Id,
UnitType(agentUnit.Type),
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
expected.Features,
agentUnit.Config,
agentUnit.ConfigStateIdx, c)
c.units = append(c.units, unit)
c.unitsCh <- UnitChanged{
Type: UnitChangedAdded,
Unit: unit,
}
} else {
// existing unit
if unit.updateState(UnitState(agentUnit.State), UnitLogLevel(agentUnit.LogLevel), agentUnit.Config, agentUnit.ConfigStateIdx) {
if unit.updateState(
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
expected.Features,
agentUnit.Config,
agentUnit.ConfigStateIdx) {
c.unitsCh <- UnitChanged{
Type: UnitChangedModified,
Unit: unit,
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
var m sync.Mutex
token := mock.NewID()
connected := false
unitOne := newUnit(mock.NewID(), UnitTypeOutput, UnitStateStarting, UnitLogLevelInfo, nil, 0, nil)
unitTwo := newUnit(mock.NewID(), UnitTypeInput, UnitStateStarting, UnitLogLevelInfo, nil, 0, nil)
unitOne := newUnit(mock.NewID(), UnitTypeOutput, UnitStateStarting, UnitLogLevelInfo, nil, nil, 0, nil)
unitTwo := newUnit(mock.NewID(), UnitTypeInput, UnitStateStarting, UnitLogLevelInfo, nil, nil, 0, nil)
srv := mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
m.Lock()
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
"custom": "payload",
})
case UnitChangedModified:
state, _, _ := change.Unit.Expected()
state, _, _, _ := change.Unit.Expected()
if state == UnitStateStopped {
change.Unit.UpdateState(UnitStateStopping, "Stopping", nil)
go func() {
Expand Down
35 changes: 29 additions & 6 deletions pkg/client/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type Unit struct {
logLevel UnitLogLevel
config *proto.UnitExpectedConfig
configIdx uint64
features *proto.Features

stateMu sync.RWMutex
state UnitState
Expand All @@ -146,11 +147,11 @@ func (u *Unit) Type() UnitType {
return u.unitType
}

// Expected returns the expected state and config for the unit.
func (u *Unit) Expected() (UnitState, UnitLogLevel, *proto.UnitExpectedConfig) {
// Expected returns the expected state, log leve, features and config for the unit.
func (u *Unit) Expected() (UnitState, UnitLogLevel, *proto.Features, *proto.UnitExpectedConfig) {
u.expMu.RLock()
defer u.expMu.RUnlock()
return u.exp, u.logLevel, u.config
return u.exp, u.logLevel, u.features, u.config
}

// State returns the currently reported state for the unit.
Expand Down Expand Up @@ -251,25 +252,37 @@ func (u *Unit) RegisterDiagnosticHook(name string, description string, filename
}

// updateConfig updates the configuration for this unit, triggering the delegate function if set.
func (u *Unit) updateState(exp UnitState, logLevel UnitLogLevel, cfg *proto.UnitExpectedConfig, cfgIdx uint64) bool {
func (u *Unit) updateState(exp UnitState,
logLevel UnitLogLevel,
features *proto.Features,
cfg *proto.UnitExpectedConfig,
cfgIdx uint64) bool {

u.expMu.Lock()
defer u.expMu.Unlock()
changed := false

if u.exp != exp {
u.exp = exp
changed = true
}

if u.logLevel != logLevel {
u.logLevel = logLevel
changed = true
}

if u.configIdx != cfgIdx {
u.configIdx = cfgIdx
if !gproto.Equal(u.config.GetSource(), cfg.GetSource()) {
if !gproto.Equal(u.config.GetSource(), cfg.GetSource()) ||
!gproto.Equal(u.features, features) {
u.config = cfg
u.features = features

changed = true
}
}

return changed
}

Expand All @@ -291,12 +304,22 @@ func (u *Unit) toObserved() *proto.UnitObserved {
}

// newUnit creates a new unit that needs to be created in this process.
func newUnit(id string, unitType UnitType, exp UnitState, logLevel UnitLogLevel, cfg *proto.UnitExpectedConfig, cfgIdx uint64, client *clientV2) *Unit {
func newUnit(
id string,
unitType UnitType,
exp UnitState,
logLevel UnitLogLevel,
features *proto.Features,
cfg *proto.UnitExpectedConfig,
cfgIdx uint64,
client *clientV2) *Unit {

return &Unit{
id: id,
unitType: unitType,
config: cfg,
configIdx: cfgIdx,
features: features,
exp: exp,
logLevel: logLevel,
state: UnitStateStarting,
Expand Down
11 changes: 6 additions & 5 deletions pkg/client/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ package client
import (
"testing"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/stretchr/testify/require"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)

var defaultTest = Unit{
Expand All @@ -35,7 +36,7 @@ func TestUnitUpdateWithSameMap(t *testing.T) {
require.NoError(t, err)

// This should return false, as the two underlying maps in `source` are the same
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelDebug, newUnit, 2)
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelDebug, nil, newUnit, 2)
require.False(t, result)
}

Expand All @@ -55,16 +56,16 @@ func TestUnitUpdateWithNewMap(t *testing.T) {
require.NoError(t, err)

// This should return true, as we have an actually new map
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelDebug, newUnit, 2)
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelDebug, nil, newUnit, 2)
require.True(t, result)
}

func TestUnitUpdateLog(t *testing.T) {
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelInfo, &proto.UnitExpectedConfig{}, 2)
result := defaultTest.updateState(UnitStateHealthy, UnitLogLevelInfo, nil, &proto.UnitExpectedConfig{}, 2)
require.True(t, result)
}

func TestUnitUpdateState(t *testing.T) {
result := defaultTest.updateState(UnitStateStopped, UnitLogLevelInfo, &proto.UnitExpectedConfig{}, 2)
result := defaultTest.updateState(UnitStateStopped, UnitLogLevelInfo, nil, &proto.UnitExpectedConfig{}, 2)
require.True(t, result)
}
Loading

0 comments on commit 7cbf7a7

Please sign in to comment.