Skip to content

Commit

Permalink
don't use prot.Features, send on 1st checkin, adjust tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Feb 10, 2023
1 parent ffe0b9b commit 3a2ae19
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
55 changes: 42 additions & 13 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ const (
TriggerStateChange // state_change_triggered
)

type FeatureFQDN struct {
Enabled bool
}
type Features struct {
FQDN FeatureFQDN
}

// UnitChanged is what is sent over the UnitChanged channel any time a change happens:
// - a unit is added, modified, or removed
// - a feature flag config or state changes
Expand All @@ -59,7 +66,7 @@ type UnitChanged struct {
// Unit is any change in a unit.
Unit *Unit
// Features are all the feature flags and their configs.
Features *proto.Features
Features Features
}

// AgentInfo is the information about the running Elastic Agent that the client is connected to.
Expand Down Expand Up @@ -131,7 +138,7 @@ type clientV2 struct {
errCh chan error
changesCh chan UnitChanged
featuresMu sync.Mutex
features *proto.Features
features Features
unitsMu sync.RWMutex
units []*Unit

Expand All @@ -158,7 +165,6 @@ func NewV2(target string, token string, versionInfo VersionInfo, opts ...grpc.Di
changesCh: make(chan UnitChanged),
diagHooks: make(map[string]diagHook),
minCheckTimeout: CheckinMinimumTimeout,
features: &proto.Features{},
}
c.registerDefaultDiagnostics()
return c
Expand Down Expand Up @@ -369,12 +375,6 @@ func (c *clientV2) sync(expected *proto.CheckinExpected) {
c.agentInfoMu.Unlock()
}

if expected.Features != nil {
c.featuresMu.Lock()
c.features = &proto.Features{Fqdn: expected.Features.Fqdn}
c.featuresMu.Unlock()
}

c.syncUnits(expected)
}

Expand All @@ -388,10 +388,14 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
c.units[i] = unit
i++
} else {
c.changesCh <- UnitChanged{
changed := UnitChanged{
Type: UnitChangedRemoved,
Unit: unit,
}

changed = c.syncFeatures(expected, changed)

c.changesCh <- changed
removed = true
}
}
Expand All @@ -411,10 +415,15 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
agentUnit.ConfigStateIdx,
c)
c.units = append(c.units, unit)
c.changesCh <- UnitChanged{

changed := UnitChanged{
Type: UnitChangedAdded,
Unit: unit,
}

changed = c.syncFeatures(expected, changed)

c.changesCh <- changed
} else {
// existing unit
triggers := unit.updateState(
Expand All @@ -423,22 +432,42 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
agentUnit.Config,
agentUnit.ConfigStateIdx)
if triggers != nil { // a.k.a something changed

c.changesCh <- UnitChanged{
changed := UnitChanged{
Triggers: triggers,
Type: UnitChangedModified,
Unit: unit,
}

changed = c.syncFeatures(expected, changed)

c.changesCh <- changed
}
}
}

if removed {
// unit removed send updated observed change so agent is notified now
// otherwise it will not be notified until the next checkin timeout
c.unitChanged()
}
}

func (c *clientV2) syncFeatures(
expected *proto.CheckinExpected, changed UnitChanged) UnitChanged {

c.featuresMu.Lock()
defer c.featuresMu.Unlock()

if expected.Features != nil &&
c.features.FQDN.Enabled != expected.Features.Fqdn.Enabled {
c.features.FQDN.Enabled = expected.Features.Fqdn.Enabled
changed.Features.FQDN.Enabled = c.features.FQDN.Enabled
changed.Triggers = append(changed.Triggers, TriggerFeature)
}

return changed
}

// findUnit finds an existing unit.
func (c *clientV2) findUnit(id string, unitType UnitType) *Unit {
for _, unit := range c.units {
Expand Down
30 changes: 30 additions & 0 deletions pkg/client/client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestClientV2_Checkin_Initial(t *testing.T) {
gotValid := false
unitOneID := mock.NewID()
unitTwoID := mock.NewID()
wantFQDN := true
reportedVersion := VersionInfo{}
srv := mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
Expand All @@ -54,6 +55,9 @@ func TestClientV2_Checkin_Initial(t *testing.T) {
Version: "8.5.0",
Snapshot: true,
},
Features: &proto.Features{
Fqdn: &proto.FQDNFeature{Enabled: wantFQDN},
},
Units: []*proto.UnitExpected{
{
Id: unitOneID,
Expand Down Expand Up @@ -125,12 +129,19 @@ func TestClientV2_Checkin_Initial(t *testing.T) {
// receive the units
var unitsMu sync.Mutex
var units []*Unit
var gotFQDN bool
go func() {
for {
select {
case <-ctx.Done():
return
case change := <-validClient.UnitChanged():
for _, t := range change.Triggers {
if t == TriggerFeature {
gotFQDN = change.Features.FQDN.Enabled
}
}

switch change.Type {
case UnitChangedAdded:
unitsMu.Lock()
Expand Down Expand Up @@ -170,10 +181,13 @@ func TestClientV2_Checkin_Initial(t *testing.T) {
assert.Equal(t, agentInfo.Version, "8.5.0")
assert.True(t, agentInfo.Snapshot)

assert.Equal(t, wantFQDN, gotFQDN)

assert.Equal(t, units[0].ID(), unitOneID)
assert.Equal(t, units[0].Type(), UnitTypeOutput)
assert.Equal(t, units[1].ID(), unitTwoID)
assert.Equal(t, units[1].Type(), UnitTypeInput)

assert.Equal(t, reportedVersion.Name, "program")
assert.Equal(t, reportedVersion.Version, "v1.0.0")
assert.Equal(t, reportedVersion.Meta, map[string]string{
Expand All @@ -187,6 +201,7 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
connected := false
unitOne := newUnit(mock.NewID(), UnitTypeOutput, UnitStateStarting, UnitLogLevelInfo, nil, 0, nil)
unitTwo := newUnit(mock.NewID(), UnitTypeInput, UnitStateStarting, UnitLogLevelInfo, nil, 0, nil)
wantFQDN := true
srv := mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
m.Lock()
Expand Down Expand Up @@ -224,6 +239,7 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
} else if (unitOne.state == UnitStateHealthy && unitTwo.state == UnitStateHealthy) || (unitOne.state == UnitStateHealthy && unitTwo.state == UnitStateStopping) {
// stop second input
return &proto.CheckinExpected{
Features: &proto.Features{Fqdn: &proto.FQDNFeature{Enabled: wantFQDN}},
Units: []*proto.UnitExpected{
{
Id: unitOne.id,
Expand All @@ -246,6 +262,7 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
} else if unitOne.state == UnitStateHealthy && unitTwo.state == UnitStateStopped {
// input stopped, remove the unit
return &proto.CheckinExpected{
Features: &proto.Features{Fqdn: &proto.FQDNFeature{Enabled: wantFQDN}},
Units: []*proto.UnitExpected{
{
Id: unitOne.id,
Expand Down Expand Up @@ -284,12 +301,20 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
// receive the units
var unitsMu sync.Mutex
units := make(map[string]*Unit)
var gotFQDN bool
var gotTriggers []Trigger
go func() {
for {
select {
case <-ctx.Done():
return
case change := <-client.UnitChanged():
for _, t := range change.Triggers {
if t == TriggerFeature {
gotFQDN = change.Features.FQDN.Enabled
}
}

switch change.Type {
case UnitChangedAdded:
unitsMu.Lock()
Expand All @@ -300,6 +325,9 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
})
case UnitChangedModified:
state, _, _ := change.Unit.Expected()
gotFQDN = change.Features.FQDN.Enabled
gotTriggers = change.Triggers

if state == UnitStateStopped {
change.Unit.UpdateState(UnitStateStopping, "Stopping", nil)
go func() {
Expand Down Expand Up @@ -347,6 +375,8 @@ func TestClientV2_Checkin_UnitState(t *testing.T) {
return nil
}))

assert.Equal(t, wantFQDN, gotFQDN)
assert.Contains(t, gotTriggers, TriggerFeature)
assert.Equal(t, UnitStateHealthy, unitOne.state)
assert.Equal(t, "Healthy", unitOne.stateMsg)
assert.Equal(t, UnitStateStopped, unitTwo.state)
Expand Down

0 comments on commit 3a2ae19

Please sign in to comment.