Skip to content

Commit

Permalink
Fix deadlocks in agent startup process (#2352)
Browse files Browse the repository at this point in the history
* fix deadlocks in agent startup process

* fix error handling

* tinker with linter

* use multierror

* still fighting with linter

* try to format everything

* Revert "try to format everything"

This reverts commit 53dcdd0.

* add log statements, const

* spelling

* refactor shutdown, uncomment chan close

* bad merge

* add changelog

* Update changelog/fragments/1678471725-coordinator_deadlock_fix.yaml

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

* Update changelog/fragments/1678471725-coordinator_deadlock_fix.yaml

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

---------

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
  • Loading branch information
fearful-symmetry and cmacknz committed Mar 15, 2023
1 parent 6ed1e14 commit fffe40a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 18 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1678471725-coordinator_deadlock_fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix several possible deadlocks on shutdown.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Fix a deadlock when the agent tries to bind to a port that is already in use. Do not wait for providers to shutdown before exiting.

# Affected component; a word indicating the component this changeset affects.
component: coordinator

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/2352

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/2310
79 changes: 66 additions & 13 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hashicorp/go-multierror"

"go.elastic.co/apm"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -144,6 +147,9 @@ type VarsManager interface {
// passing it into the components runtime manager.
type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error)

// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
const CoordinatorShutdownTimeout = time.Second * 5

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand Down Expand Up @@ -174,6 +180,9 @@ type Coordinator struct {
vars []*transpiler.Vars
}

// ErrFatalCoordinator is returned when a coordinator sub-component returns an error, as opposed to a simple context-cancelled.
var ErrFatalCoordinator = errors.New("fatal error in coordinator")

// New creates a new coordinator.
func New(logger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capability, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
var fleetState cproto.State
Expand Down Expand Up @@ -363,6 +372,10 @@ func (c *Coordinator) Run(ctx context.Context) error {
// do not restart
return err
}
if errors.Is(err, ErrFatalCoordinator) {
c.state.UpdateState(state.WithState(agentclient.Failed, "Fatal coordinator error"), state.WithFleetState(agentclient.Stopped, "Fatal coordinator error"))
return err
}
}
c.state.UpdateState(state.WithState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err)))
c.logger.Errorf("coordinator failed and will be restarted: %s", err)
Expand Down Expand Up @@ -519,19 +532,7 @@ func (c *Coordinator) runner(ctx context.Context) error {
for {
select {
case <-ctx.Done():
runtimeErr := <-runtimeErrCh
configErr := <-configErrCh
varsErr := <-varsErrCh
if runtimeErr != nil && !errors.Is(runtimeErr, context.Canceled) {
return runtimeErr
}
if configErr != nil && !errors.Is(configErr, context.Canceled) {
return configErr
}
if varsErr != nil && !errors.Is(varsErr, context.Canceled) {
return varsErr
}
return ctx.Err()
return c.handleCoordinatorDone(ctx, varsErrCh, runtimeErrCh, configErrCh)
case <-runtimeRun:
if ctx.Err() == nil {
cancel()
Expand Down Expand Up @@ -736,6 +737,58 @@ func (c *Coordinator) compute() (map[string]interface{}, []component.Component,
return cfg, comps, nil
}

func (c *Coordinator) handleCoordinatorDone(ctx context.Context, varsErrCh, runtimeErrCh, configErrCh chan error) error {
var runtimeErr error
var configErr error
var varsErr error
// in case other components are locked up, let us time out
timeoutWait := time.NewTimer(CoordinatorShutdownTimeout)

for {
select {
case <-timeoutWait.C:
var timeouts []string
if runtimeErr == nil {
timeouts = []string{"no response from runtime component"}
}
if configErr == nil {
timeouts = append(timeouts, "no response from configWatcher component")
}
if varsErr == nil {
timeouts = append(timeouts, "no response from varsWatcher component")
}
c.logger.Debugf("timeout while waiting for other components to shut down: %v", timeouts)
break
case err := <-runtimeErrCh:
runtimeErr = err
case err := <-configErrCh:
configErr = err
case err := <-varsErrCh:
varsErr = err
}
if runtimeErr != nil && configErr != nil && varsErr != nil {
timeoutWait.Stop()
break
}
}
// try not to lose any errors
var combinedErr error
if runtimeErr != nil && !errors.Is(runtimeErr, context.Canceled) {
combinedErr = multierror.Append(combinedErr, fmt.Errorf("runtime Manager: %w", runtimeErr))
}
if configErr != nil && !errors.Is(configErr, context.Canceled) {
combinedErr = multierror.Append(combinedErr, fmt.Errorf("config Manager: %w", configErr))
}
if varsErr != nil && !errors.Is(varsErr, context.Canceled) {
combinedErr = multierror.Append(combinedErr, fmt.Errorf("vars Watcher: %w", varsErr))
}
if combinedErr != nil {
return fmt.Errorf("%w: %s", ErrFatalCoordinator, combinedErr.Error()) //nolint:errorlint //errors.Is() won't work if we pass through the combined errors with %w
}
// if there's no component errors, continue to pass along the context error
return ctx.Err()
}

type coordinatorComponentLog struct {
ID string `json:"id"`
State string `json:"state"`
Expand Down
13 changes: 9 additions & 4 deletions internal/pkg/agent/application/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type periodic struct {
}

func (p *periodic) Run(ctx context.Context) error {
if err := p.work(); err != nil {
if err := p.work(ctx); err != nil {
return err
}

Expand All @@ -41,7 +41,7 @@ func (p *periodic) Run(ctx context.Context) error {
case <-t.C:
}

if err := p.work(); err != nil {
if err := p.work(ctx); err != nil {
return err
}
}
Expand All @@ -61,7 +61,7 @@ func (p *periodic) Watch() <-chan coordinator.ConfigChange {
return p.ch
}

func (p *periodic) work() error {
func (p *periodic) work(ctx context.Context) error {
files, err := p.discover()
if err != nil {
return errors.New(err, "could not discover configuration files", errors.TypeConfig)
Expand Down Expand Up @@ -110,7 +110,12 @@ func (p *periodic) work() error {
p.watcher.Invalidate()
return err
}
p.ch <- &localConfigChange{cfg}
select {
case <-ctx.Done():
return ctx.Err()
case p.ch <- &localConfigChange{cfg}:
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewManager(
func (m *Manager) Run(ctx context.Context) error {
lis, err := net.Listen("tcp", m.listenAddr)
if err != nil {
return err
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}
m.netMx.Lock()
m.listener = lis
Expand Down

0 comments on commit fffe40a

Please sign in to comment.