Skip to content

Commit

Permalink
Feature flags and Units are a "change"
Browse files Browse the repository at this point in the history
Generalize the unit changed to a general change that can come from a unit or feature flag
  • Loading branch information
AndersonQ committed Feb 8, 2023
1 parent faf3f9b commit 9a742b9
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 294 deletions.
11 changes: 10 additions & 1 deletion elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,17 @@ message CheckinAgentInfo {
bool snapshot = 3;
}

// Feature flags configurations.
// Added on Elastic Agent v8.7.1.
message Features {
FQDNFeature fqdn = 1;
// Index of the either current configuration or new configuration provided.
uint64 config_state_idx = 1;

FQDNFeature fqdn = 2;
}

// FQDN feature flag indicates to use FQDN for host.name instead of hostname.
// Added on Elastic Agent v8.7.1.
message FQDNFeature {
bool enabled = 1;
}
Expand All @@ -307,6 +314,8 @@ message CheckinExpected {
// Agent info is provided only on first CheckinExpected response to the component.
CheckinAgentInfo agent_info = 2;

// Features are the expected feature flags configurations.
// Added on Elastic Agent v8.7.1.
Features features = 3;
}

Expand Down
43 changes: 37 additions & 6 deletions magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package main

import (
"fmt"
"os"
"strings"

Expand All @@ -21,6 +22,7 @@ const (
goLicenserRepo = "github.com/elastic/go-licenser"
goProtocGenGo = "google.golang.org/protobuf/cmd/protoc-gen-go@v1.28"
goProtocGenGoGRPC = "google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2"
stringerRepo = "golang.org/x/tools/cmd/stringer@v0.5.0"
)

// Aliases for commands required by master makefile
Expand Down Expand Up @@ -54,9 +56,14 @@ func (Prepare) InstallGoLint() error {
return GoGet(goLintRepo)
}

// All runs prepare:installGoLicenser and prepare:installGoLint.
// InstallStringer install go stringer to generate String methods for constants.
func (Prepare) InstallStringer() error {
return GoInstall(stringerRepo)
}

// All runs prepare:installGoLicenser, prepare:installGoLint and prepare:installGoLint.
func (Prepare) All() {
mg.SerialDeps(Prepare.InstallGoLicenser, Prepare.InstallGoLint)
mg.SerialDeps(Prepare.InstallGoLicenser, Prepare.InstallGoLint, Prepare.InstallStringer)
}

// Prepare installs the required GRPC tools for generation to occur.
Expand All @@ -67,14 +74,38 @@ func (Update) Prepare() error {
return GoInstall(goProtocGenGoGRPC)
}

// Generate generates the GRPC code.
// Generate generates the necessary GRPC and Go code. It generates both,
// then reports all errors if any.
func (Update) Generate() error {
defer mg.SerialDeps(Format.All)
return sh.RunV(

errGRPC := sh.RunV(
"protoc",
"--go_out=pkg/proto", "--go_opt=paths=source_relative",
"--go-grpc_out=pkg/proto", "--go-grpc_opt=paths=source_relative",
"--go_out=pkg/proto",
"--go_opt=paths=source_relative",
"--go-grpc_out=pkg/proto",
"--go-grpc_opt=paths=source_relative",
"elastic-agent-client.proto")
if errGRPC != nil {
errGRPC = fmt.Errorf("failed to generate GRPC code: %w")
}

errGenerate := sh.RunV("go", "generate", "./...")
if errGenerate != nil {
errGenerate = fmt.Errorf("failed to run go generate: %w")
}

switch {
case errGRPC != nil && errGenerate != nil:
return fmt.Errorf("all code generation failed: '%v' and '%v'",
errGRPC, errGenerate)
case errGRPC != nil:
return errGRPC
case errGenerate != nil:
return errGenerate
}

return nil
}

// All runs update:prepare then update:generate.
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func TestArtifact(t *testing.T) {
select {
case <-ctx.Done():
return
case change := <-client.UnitChanges():
case change := <-client.Changes():
switch change.Type {
case UnitChangedAdded:
case ChangeUnitAdded:
unitsMu.Lock()
units = append(units, change.Unit)
unitsMu.Unlock()
Expand Down
115 changes: 68 additions & 47 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,37 @@ import (
"time"

"google.golang.org/grpc"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-client/v7/pkg/utils"
)

// UnitChangedType defines types for when units are adjusted.
type UnitChangedType int
// ChangeType defines types for when units are adjusted.
type ChangeType int

//go:generate stringer -type=ChangeType -linecomment -output client_v2_strings.go
const (
// UnitChangedAdded is when a new unit is added.
UnitChangedAdded UnitChangedType = 1
// UnitChangedModified is when an existing unit is modified.
UnitChangedModified UnitChangedType = 2
// UnitChangedRemoved is when an existing unit is removed.
UnitChangedRemoved UnitChangedType = 3
// ChangeUnitAdded is when a new unit is added.
ChangeUnitAdded ChangeType = iota // unit_added
// ChangeUnitModified is when an existing unit is modified.
ChangeUnitModified // unit_modified
// ChangeUnitRemoved is when an existing unit is removed.
ChangeUnitRemoved // unit_removed
// ChangeFeatureModified is when a feature flag is modified.
ChangeFeatureModified // feature_modified
)

// String returns string representation for the unit changed type.
func (t UnitChangedType) String() string {
switch t {
case UnitChangedAdded:
return "added"
case UnitChangedModified:
return "modified"
case UnitChangedRemoved:
return "removed"
}
return "unknown"
}

// UnitChanged is what is sent over the UnitChanges channel any time a unit is added, modified, or removed.
type UnitChanged struct {
Type UnitChangedType
// Changes is what is sent over the Changes channel any time a change happens:
// - a unit is added, modified, or removed
// - a feature flag config or state changes
type Changes struct {
Type ChangeType
// Unit is any change in a unit.
Unit *Unit
// Features are all the feature flags and their configs.
Features *proto.Features
}

// AgentInfo is the information about the running Elastic Agent that the client is connected to.
Expand Down Expand Up @@ -79,15 +75,15 @@ type V2 interface {
Start(ctx context.Context) error
// Stop stops the connection to Elastic Agent.
Stop()
// UnitChanges returns channel client send unit change notifications to.
// Changes returns the channel the client send change notifications to.
//
// User of this client must read from this channel, or it will block the client.
UnitChanges() <-chan UnitChanged
Changes() <-chan Changes
// Errors returns channel of errors that occurred during communication.
//
// User of this client must read from this channel, or it will block the client.
Errors() <-chan error
// Artifacts returns the artifacts client.
// Artifacts returns the artifacts' client.
Artifacts() ArtifactsClient
// AgentInfo returns the information about the running Elastic Agent that the client is connected to.
//
Expand Down Expand Up @@ -118,11 +114,13 @@ type clientV2 struct {
cfgMu sync.RWMutex
obsMu sync.RWMutex

kickCh chan struct{}
errCh chan error
unitsCh chan UnitChanged
unitsMu sync.RWMutex
units []*Unit
kickCh chan struct{}
errCh chan error
changesCh chan Changes
featuresMu sync.Mutex
features *proto.Features
unitsMu sync.RWMutex
units []*Unit

dmx sync.RWMutex
diagHooks map[string]diagHook
Expand All @@ -144,9 +142,10 @@ func NewV2(target string, token string, versionInfo VersionInfo, opts ...grpc.Di
versionInfo: versionInfo,
kickCh: make(chan struct{}, 1),
errCh: make(chan error),
unitsCh: make(chan UnitChanged),
changesCh: make(chan Changes),
diagHooks: make(map[string]diagHook),
minCheckTimeout: CheckinMinimumTimeout,
features: &proto.Features{},
}
c.registerDefaultDiagnostics()
return c
Expand Down Expand Up @@ -178,9 +177,9 @@ func (c *clientV2) Stop() {
}
}

// UnitChanges returns channel client send unit change notifications to.
func (c *clientV2) UnitChanges() <-chan UnitChanged {
return c.unitsCh
// Changes returns channel client send unit change notifications to.
func (c *clientV2) Changes() <-chan Changes {
return c.changesCh
}

// Errors returns channel of errors that occurred during communication.
Expand Down Expand Up @@ -280,7 +279,7 @@ func (c *clientV2) checkinRoundTrip() {
}
c.agentInfoMu.Unlock()
}
c.syncUnits(expected)
c.sync(expected)
}
}()

Expand Down Expand Up @@ -354,7 +353,29 @@ func (c *clientV2) sendObserved(client proto.ElasticAgent_CheckinV2Client) error
return client.Send(msg)
}

// syncUnits syncs the expected units with the current state.
// sync syncs the expected state with the current state.
func (c *clientV2) sync(expected *proto.CheckinExpected) {
c.syncUnits(expected)

}
func (c *clientV2) syncFeatures(expected *proto.CheckinExpected) {
c.featuresMu.Lock()
defer c.featuresMu.Unlock()

if c.features.ConfigStateIdx != expected.Features.ConfigStateIdx {
c.features.ConfigStateIdx = expected.Features.ConfigStateIdx

if !gproto.Equal(c.features, expected.Features) {
c.features = expected.Features

c.changesCh <- Changes{
Type: ChangeFeatureModified,
Features: expected.Features,
}
}
}
}

func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
c.unitsMu.Lock()
defer c.unitsMu.Unlock()
Expand All @@ -365,13 +386,14 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
c.units[i] = unit
i++
} else {
c.unitsCh <- UnitChanged{
Type: UnitChangedRemoved,
c.changesCh <- Changes{
Type: ChangeUnitRemoved,
Unit: unit,
}
removed = true
}
}

// resize so units that no longer exist are removed from the slice
c.units = c.units[:i]
for _, agentUnit := range expected.Units {
Expand All @@ -383,24 +405,23 @@ func (c *clientV2) syncUnits(expected *proto.CheckinExpected) {
UnitType(agentUnit.Type),
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
expected.Features,
agentUnit.Config,
agentUnit.ConfigStateIdx, c)
agentUnit.ConfigStateIdx,
c)
c.units = append(c.units, unit)
c.unitsCh <- UnitChanged{
Type: UnitChangedAdded,
c.changesCh <- Changes{
Type: ChangeUnitAdded,
Unit: unit,
}
} else {
// existing unit
if unit.updateState(
UnitState(agentUnit.State),
UnitLogLevel(agentUnit.LogLevel),
expected.Features,
agentUnit.Config,
agentUnit.ConfigStateIdx) {
c.unitsCh <- UnitChanged{
Type: UnitChangedModified,
c.changesCh <- Changes{
Type: ChangeUnitModified,
Unit: unit,
}
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/client/client_v2_strings.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9a742b9

Please sign in to comment.