Skip to content

RSDK-10722: Prevent spawning infinite restart handlers when module exits quickly #4997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
438bfa2
Avoid incorrectly reporting restart success for modules with no resou…
jmatth May 18, 2025
7af6801
Allow at most one module restart to trigger future restart attempts
jmatth May 19, 2025
756d470
Prevent deadlock when modmanager oue handler needs to lock parent object
jmatth May 19, 2025
00ef077
Infinitely attempt to restart crashed module
jmatth May 19, 2025
7c9698e
Fix tests that depended on 3 module restart attempts
jmatth May 19, 2025
ebfa482
Fix TestFTDCAfterModuleCrash
jmatth May 19, 2025
9283063
Skip TestFTDCAfterModuleCrash on non-Linux systems
jmatth May 20, 2025
71b81fa
Remove dead code
jmatth May 20, 2025
700af95
Update comment
jmatth May 20, 2025
1f761ef
Fix TestRestartModule
jmatth May 20, 2025
58ca1b3
Using context to abandon restart attempts when stopping a module process
jmatth May 23, 2025
091752e
Not removing crashed modules from the modmanager modMap, perform as m…
jmatth May 23, 2025
275155e
Allow Manager.Close to return crashed module error rather than deadlock
jmatth May 23, 2025
7243fdb
Allow crashed modules to be removed from modmanager
jmatth May 25, 2025
186b890
Handle crashed modules in original OUE handler rather than spawning a…
jmatth May 26, 2025
4026c6c
Add missing channel close, remove unnecessary timeout modification in…
jmatth May 27, 2025
7841737
Replacing blocking channel with child context
jmatth May 27, 2025
bb6cd42
Revert "Replacing blocking channel with child context"
jmatth May 27, 2025
e094021
Do not remove resources from the manager/graph while a module is cras…
jmatth May 27, 2025
c11d21b
Fix formatting
jmatth May 27, 2025
3e70bda
Fixing robot impl test
jmatth May 28, 2025
f1f9d14
Improve cleanup logic, comments
jmatth May 28, 2025
9fc2c1f
Fix test
jmatth May 28, 2025
0ddf24b
Fix comments and variable names in response to PR feedback
jmatth May 28, 2025
715d0ea
Move lock call to right before context check, add comments, adjust lo…
jmatth May 29, 2025
bb04291
Revert increased test timeouts
jmatth May 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 88 additions & 110 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo
return nil
}

func (mgr *Manager) startModuleProcess(mod *module) error {
func (mgr *Manager) startModuleProcess(mod *module, oue pexec.UnexpectedExitHandler) error {
return mod.startProcess(
mgr.restartCtx,
mgr.parentAddr,
mgr.newOnUnexpectedExitHandler(mod),
oue,
mgr.viamHomeDir,
mgr.packagesDir,
)
Expand All @@ -353,7 +353,9 @@ func (mgr *Manager) startModule(ctx context.Context, mod *module) error {
ctx, "Waiting for module to complete startup and registration", "module", mod.cfg.Name, mod.logger)
defer cleanup()

if err := mgr.startModuleProcess(mod); err != nil {
var restartCtx context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Could we maybe call this moduleRestartCtx? I like the idea of having module-specific restart contexts, but I just want to distinguish it from the manager-wide restartCtx.

restartCtx, mod.restartCancel = context.WithCancel(mgr.restartCtx)
if err := mgr.startModuleProcess(mod, mgr.newOnUnexpectedExitHandler(restartCtx, mod)); err != nil {
return errors.WithMessage(err, "error while starting module "+mod.cfg.Name)
}

Expand Down Expand Up @@ -441,9 +443,8 @@ func (mgr *Manager) Remove(modName string) ([]resource.Name, error) {

handledResources := mod.resources

// Always mark pendingRemoval even if there is no more work to do because the
// restart handler checks it to avoid restarting a removed module.
mod.pendingRemoval = true
mod.restartCancel()

// If module handles no resources, remove it now.
if len(handledResources) == 0 {
Expand Down Expand Up @@ -494,12 +495,11 @@ func (mgr *Manager) closeModule(mod *module, reconfigure bool) error {

mod.deregisterResources()

mgr.rMap.Range(func(r resource.Name, m *module) bool {
for r, m := range mgr.rMap.Range {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this from the new Go custom range/iterator implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think an earlier version of these changes modified the code surrounding this loop so it wasn't just a random drive by syntax update. Happy to remove it if it's just cluttering the diff now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me I think it's much easier to read.

if m == mod {
mgr.rMap.Delete(r)
}
return true
})
}
mgr.modules.Delete(mod.cfg.Name)

mod.logger.Infow("Module successfully closed", "module", mod.cfg.Name)
Expand Down Expand Up @@ -869,109 +869,101 @@ var oueRestartInterval = 5 * time.Second

// newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function
// for the passed-in module to include in the pexec.ProcessConfig.
func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExitHandler {
func (mgr *Manager) newOnUnexpectedExitHandler(ctx context.Context, mod *module) pexec.UnexpectedExitHandler {
return func(exitCode int) (continueAttemptingRestart bool) {
// Log error immediately, as this is unexpected behavior.
mod.logger.Errorw(
"Module has unexpectedly exited.", "module", mod.cfg.Name, "exit_code", exitCode,
)

// Note that a module process can be restarted while preserving the same `module` object. And consider the case where a module is being
// restarted due to a configuration change concurrently with the existing module process crashing. There are two acceptable
// interleavings:
// 1. The `onUnexpectedExitHandler` restarts the module process with the old configuration.
// 1a) and the Reconfigure then shuts down + restarts the (freshly launched) module process with one using the updated configuration.
// 2. Or, the `Reconfigure` executes and starts the module process with the updated config. The `onUnexpectedExitHandler` will still
// run. But will become a no-op.
//
// For the second scenario, we check our assumptions after acquiring the modmanager mutex. If the module process is running, there is
// nothing for us to do.
mgr.mu.Lock()
defer mgr.mu.Unlock()

if mod.pendingRemoval {
mod.logger.Infow("Module marked for removal, abandoning restart attempt")
return
}

// Something else already started a new process while we were waiting on the
// lock, so no restart is needed.
if err := mod.process.Status(); err == nil {
mod.logger.Infow("Module process already running, abandoning restart attempt")
return
// There are two relevant calls that may race with a crashing module:
// 1. mgr.Remove, which wants to stop the module and remove it entirely
// 2. mgr.Reconfigure, which wants to stop the module and replace it with
// a new instance using a different configuration.
// Both lock the manager mutex and then cancel the restart context for the
// module. To avoid racing we lock the mutex and then check if the context
// is cancelled, exiting early if so. If we win the race we may restart the
// module and it will immediately shut down when we release the lock and
// Remove/Reconfigure runs, which is acceptable.
locked := false
lock := func() {
mgr.mu.Lock()
locked = true
}

if err := mod.sharedConn.Close(); err != nil {
mod.logger.Warnw("Error closing connection to crashed module. Continuing restart attempt",
"error", err)
unlock := func() {
if locked {
mgr.mu.Unlock()
locked = false
}
}
lock()
defer unlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the context can only be canceled by the goroutine holding the manager mutex?

I think it'd be slightly better to always follow the lock call with a context check. I agree that's functionally what we have now. But I think having them literally back to back would be better.

Right now it looks like the context check is an optimization to bail and save time. But it's actually a correctness requirement to avoid double-starting.

// Enter a loop trying to restart the module every 5 seconds. If the
// restart succeeds we return, this goroutine ends, and the management
// goroutine started by the new module managedProcess handles any future
// crashes. If the startup fails we kill the new process, its management
// goroutine returns without doing anything, and we continue to loop until
// we succeed or our context is cancelled.
cleanupPerformed := false
for {
if err := ctx.Err(); err != nil {
mod.logger.Infow("Restart context canceled, abandoning restart attempt", "err", err)
return
}

if mgr.ftdc != nil {
mgr.ftdc.Remove(mod.getFTDCName())
}
if !cleanupPerformed {
mod.cleanupAfterCrash(mgr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's a precondition that cleanupAfterCrash is called before attemptRestart?

I see attemptRestart also calls cleanupAfterCrash when it errors.

Given we have the manager mutex (which I assume prevents concurrent "create stuff"), do we need this cleanupPerformed reference inside the for-loop? I imagine the goal is that we want to first check the context before cleaning things up?

And if we pulled the cleanupAfterCrash out before the for loop, we'd have to also pull out the context check?

I'm just kind of weirded out about this important sounding cleanup method invocation that I'm not particularly familiar with.

cleanupPerformed = true
}

// If attemptRestart returns any orphaned resource names, restart failed,
// and we should remove orphaned resources. Since we handle process
// restarting ourselves, return false here so goutils knows not to attempt
// a process restart.
if orphanedResourceNames := mgr.attemptRestart(mgr.restartCtx, mod); orphanedResourceNames != nil {
if mgr.removeOrphanedResources != nil {
mgr.removeOrphanedResources(mgr.restartCtx, orphanedResourceNames)
mod.logger.Debugw(
"Removed resources after failed module restart",
"module", mod.cfg.Name,
"resources", resource.NamesToStrings(orphanedResourceNames),
)
err := mgr.attemptRestart(ctx, mod)
if err == nil {
break
}
return
unlock()
utils.SelectContextOrWait(ctx, oueRestartInterval)
lock()
}
mod.logger.Infow("Module successfully restarted, re-adding resources", "module", mod.cfg.Name)

// Otherwise, add old module process' resources to new module; warn if new
// module cannot handle old resource and remove it from mod.resources.
// Finally, handle orphaned resources.
var orphanedResourceNames []resource.Name
var restoredResourceNamesStr []string
for name, res := range mod.resources {
// The `addResource` method might still be executing for this resource with a
// read lock, so we execute it here with a write lock to make sure it doesn't
// run concurrently.
if _, err := mgr.addResource(mgr.restartCtx, res.conf, res.deps); err != nil {
mod.logger.Warnw("Error while re-adding resource to module",
"resource", name, "module", mod.cfg.Name, "error", err)
mgr.rMap.Delete(name)

mod.resourcesMu.Lock()
delete(mod.resources, name)
mod.resourcesMu.Unlock()

confProto, err := config.ComponentConfigToProto(&res.conf)
if err != nil {
orphanedResourceNames = append(orphanedResourceNames, name)
continue
}
_, err = mod.client.AddResource(ctx, &pb.AddResourceRequest{Config: confProto, Dependencies: res.deps})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • Why switch to mod.client.AddResource instead of using mgr.addResource?
  • Why not remove orphaned resources from mgr.rMap and mod.resources as we were doing before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using mgr.addResounce: primarily because mgr.addResource performs more work that we don't need to repeat: populating some maps and constructing a resource.Resource. In a crashed state we already did this and users of Manager may have cached instances of resource.Resources that will start working again once the module is successfully restarted. Looking at it again, I don't think calling mgr.addResource would actually break anything, but it would waste time doing repeat work, instantiating a resource.Resource that will just immediately be GCed, and emitting log lines that may just be noise.

Not removing resources: the high-level abstract answer is because we're still trying to restart the module, so until a reconfigure removes the module they're not orphaned. More concretely, so that clients who try to use resources on a crashed module won't see "this resource doesn't exist" and instead see "this resource exists but the connection failed", which more closely matches reality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not remove orphaned resources from mgr.rMap and mod.resources as we were doing before?

Adding on to @jmatth 's response

Josh came to me with how orphaned resources with respect to the resource graph ought to behave in these module crashing states. And I correlated that to remote resources when connections to remotes go offline (the "will a video stream come back when the network heals" problem).

I think my perspective was also the simpler one to code towards. But happy to go over this decision more thoughtfully.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your answers to both my questions make sense to me, so this seems totally fine. I had already assumed based on the code here we were moving toward the "treat crashed modules as downed remotes" strategy, so I'm happy to hear that that was an offline discussion already.

if err != nil {
orphanedResourceNames = append(orphanedResourceNames, name)
continue
}
restoredResourceNamesStr = append(restoredResourceNamesStr, name.String())
}
if len(orphanedResourceNames) > 0 && mgr.removeOrphanedResources != nil {
orphanedResourceNamesStr := make([]string, len(orphanedResourceNames))
for _, n := range orphanedResourceNames {
orphanedResourceNamesStr = append(orphanedResourceNamesStr, n.String())
}
mod.logger.Warnw("Some modules failed to re-add after crashed module restart and will be removed",
"module", mod.cfg.Name,
"orphanedResources", orphanedResourceNamesStr)
unlock()
mgr.removeOrphanedResources(mgr.restartCtx, orphanedResourceNames)
}

mod.logger.Infow("Module resources successfully re-added after module restart", "module", mod.cfg.Name)
mod.logger.Warnw("Module resources successfully re-added after module restart",
"module", mod.cfg.Name,
"resources", restoredResourceNamesStr)
return
}
}

// attemptRestart will attempt to restart the module up to three times and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to three times

No longer true, right?

// return the names of now orphaned resources.
func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.Name {
// deregister crashed module's resources, and let later checkReady reset m.handles
// before reregistering.
mod.deregisterResources()

var orphanedResourceNames []resource.Name
for name := range mod.resources {
orphanedResourceNames = append(orphanedResourceNames, name)
}

// Attempt to remove module's .sock file if module did not remove it
// already.
rutils.RemoveFileNoError(mod.addr)

func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) error {
var success, processRestarted bool
defer func() {
if !success {
Expand All @@ -985,12 +977,6 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
}
}()

if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Why remove this early exit from attemptRestart? Shouldn't we abandon the attempt if the ctx passed into attemptRestart has already errored?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's superfluous: we already check if the context was cancelled after we take the lock in the OUE handler that's calling this function.

mgr.logger.CInfow(
ctx, "Will not attempt to restart crashed module", "module", mod.cfg.Name, "reason", ctx.Err().Error(),
)
return orphanedResourceNames
}
mgr.logger.CInfow(ctx, "Attempting to restart crashed module", "module", mod.cfg.Name)

// No need to check mgr.untrustedEnv, as we're restarting the same
Expand All @@ -1000,47 +986,39 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
ctx, "Waiting for module to complete restart and re-registration", "module", mod.cfg.Name, mod.logger)
defer cleanup()

// Attempt to restart module process 3 times.
for attempt := 1; attempt < 4; attempt++ {
if err := mgr.startModuleProcess(mod); err != nil {
mgr.logger.Errorw("Error while restarting crashed module", "restart attempt",
attempt, "module", mod.cfg.Name, "error", err)
if attempt == 3 {
// return early upon last attempt failure.
return orphanedResourceNames
}
} else {
break
blockRestart := make(chan struct{})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgottlieb I know you didn't like this pattern last time so I included an alternate implementation here: 7841737. Personally I prefer this because the code to modify the OUE behavior is in one contiguous block, while the other version involves a branch in a deferred call and manipulating state on the module struct. On the other hand the other implementation builds on the existing context and lock usage rather than introducing another synchronization primitive so 🤷.

There's also a third option to rely on each chained OUE handler to perform the next restart attempt rather than looping in the first OUE, but I think the loop is more readable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand the logic here, but the pattern that's being used here is a little hard for me to follow. I think what you have is fine, but could you leave a comment above L989 to give a little more context on how this works (blocking restart and returning false on lack of success in particular).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjirewis I added a comment explaining the problem this is solving in the most recent commit. To expand on it a bit, consider the following code:

func startProc() pexec.ManagedProcess {
  cgf := pexec.ProcessConfig{
    //...
    OnUnexpectedExit: func(_ int) bool {
      startProc()
      return false
    },
  }
  return exec.NewManagedProcess(cfg, logger)
}

proc := startProc()
// some other code
proc.Stop()

If the process crashes before you call proc.Stop() you just leaked a process because the OUE handler created a new process that you don't have any reference to. Obviously this code is simplified but it's representative of how the "two running module processes" bug occurred in the first place. The channel + wrapper function is one place this PR fixes the issue: the channel allows us to block the new OUE from executing until the surrounding function returns, and the function wrapping the real OUE returns early without attempting a restart if the call to attemptRestart ultimately returned an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. The explanation here makes sense to me, and the comment you left is great. Thanks!

defer close(blockRestart)
oue := func(exitCode int) bool {
<-blockRestart
if !success {
return false
}
return mgr.newOnUnexpectedExitHandler(ctx, mod)(exitCode)
}

// Wait with a bit of backoff. Exit early if context has errorred.
if !utils.SelectContextOrWait(ctx, time.Duration(attempt)*oueRestartInterval) {
mgr.logger.CInfow(
ctx, "Will not continue to attempt restarting crashed module", "module", mod.cfg.Name, "reason", ctx.Err().Error(),
)
return orphanedResourceNames
}
if err := mgr.startModuleProcess(mod, oue); err != nil {
mgr.logger.Errorw("Error while restarting crashed module",
"module", mod.cfg.Name, "error", err)
return err
}
processRestarted = true

if err := mod.dial(); err != nil {
mgr.logger.CErrorw(ctx, "Error while dialing restarted module",
"module", mod.cfg.Name, "error", err)
return orphanedResourceNames
return err
}

if err := mod.checkReady(ctx, mgr.parentAddr); err != nil {
mgr.logger.CErrorw(ctx, "Error while waiting for restarted module to be ready",
"module", mod.cfg.Name, "error", err)
return orphanedResourceNames
return err
}

if pc := mod.sharedConn.PeerConn(); mgr.modPeerConnTracker != nil && pc != nil {
mgr.modPeerConnTracker.Add(mod.cfg.Name, pc)
}

mod.registerResources(mgr)

success = true
return nil
}
Expand Down
Loading
Loading