-
Notifications
You must be signed in to change notification settings - Fork 123
RSDK-10722: Prevent spawning infinite restart handlers when module exits quickly #4997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
438bfa2
7af6801
756d470
00ef077
7c9698e
ebfa482
9283063
71b81fa
700af95
1f761ef
58ca1b3
091752e
275155e
7243fdb
186b890
4026c6c
7841737
bb6cd42
e094021
c11d21b
3e70bda
f1f9d14
9fc2c1f
0ddf24b
715d0ea
bb04291
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -323,11 +323,11 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo | |
return nil | ||
} | ||
|
||
func (mgr *Manager) startModuleProcess(mod *module) error { | ||
func (mgr *Manager) startModuleProcess(mod *module, oue pexec.UnexpectedExitHandler) error { | ||
return mod.startProcess( | ||
mgr.restartCtx, | ||
mgr.parentAddr, | ||
mgr.newOnUnexpectedExitHandler(mod), | ||
oue, | ||
mgr.viamHomeDir, | ||
mgr.packagesDir, | ||
) | ||
|
@@ -353,7 +353,9 @@ func (mgr *Manager) startModule(ctx context.Context, mod *module) error { | |
ctx, "Waiting for module to complete startup and registration", "module", mod.cfg.Name, mod.logger) | ||
defer cleanup() | ||
|
||
if err := mgr.startModuleProcess(mod); err != nil { | ||
var restartCtx context.Context | ||
restartCtx, mod.restartCancel = context.WithCancel(mgr.restartCtx) | ||
if err := mgr.startModuleProcess(mod, mgr.newOnUnexpectedExitHandler(restartCtx, mod)); err != nil { | ||
return errors.WithMessage(err, "error while starting module "+mod.cfg.Name) | ||
} | ||
|
||
|
@@ -441,9 +443,8 @@ func (mgr *Manager) Remove(modName string) ([]resource.Name, error) { | |
|
||
handledResources := mod.resources | ||
|
||
// Always mark pendingRemoval even if there is no more work to do because the | ||
// restart handler checks it to avoid restarting a removed module. | ||
mod.pendingRemoval = true | ||
mod.restartCancel() | ||
|
||
// If module handles no resources, remove it now. | ||
if len(handledResources) == 0 { | ||
|
@@ -494,12 +495,11 @@ func (mgr *Manager) closeModule(mod *module, reconfigure bool) error { | |
|
||
mod.deregisterResources() | ||
|
||
mgr.rMap.Range(func(r resource.Name, m *module) bool { | ||
for r, m := range mgr.rMap.Range { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this from the new Go custom range/iterator implementations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I think an earlier version of these changes modified the code surrounding this loop so it wasn't just a random drive by syntax update. Happy to remove it if it's just cluttering the diff now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems fine to me I think it's much easier to read. |
||
if m == mod { | ||
mgr.rMap.Delete(r) | ||
} | ||
return true | ||
}) | ||
} | ||
mgr.modules.Delete(mod.cfg.Name) | ||
|
||
mod.logger.Infow("Module successfully closed", "module", mod.cfg.Name) | ||
|
@@ -869,109 +869,101 @@ var oueRestartInterval = 5 * time.Second | |
|
||
// newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function | ||
// for the passed-in module to include in the pexec.ProcessConfig. | ||
func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExitHandler { | ||
func (mgr *Manager) newOnUnexpectedExitHandler(ctx context.Context, mod *module) pexec.UnexpectedExitHandler { | ||
return func(exitCode int) (continueAttemptingRestart bool) { | ||
// Log error immediately, as this is unexpected behavior. | ||
mod.logger.Errorw( | ||
"Module has unexpectedly exited.", "module", mod.cfg.Name, "exit_code", exitCode, | ||
) | ||
|
||
// Note that a module process can be restarted while preserving the same `module` object. And consider the case where a module is being | ||
// restarted due to a configuration change concurrently with the existing module process crashing. There are two acceptable | ||
// interleavings: | ||
// 1. The `onUnexpectedExitHandler` restarts the module process with the old configuration. | ||
// 1a) and the Reconfigure then shuts down + restarts the (freshly launched) module process with one using the updated configuration. | ||
// 2. Or, the `Reconfigure` executes and starts the module process with the updated config. The `onUnexpectedExitHandler` will still | ||
// run. But will become a no-op. | ||
// | ||
// For the second scenario, we check our assumptions after acquiring the modmanager mutex. If the module process is running, there is | ||
// nothing for us to do. | ||
mgr.mu.Lock() | ||
defer mgr.mu.Unlock() | ||
|
||
if mod.pendingRemoval { | ||
mod.logger.Infow("Module marked for removal, abandoning restart attempt") | ||
return | ||
} | ||
|
||
// Something else already started a new process while we were waiting on the | ||
// lock, so no restart is needed. | ||
if err := mod.process.Status(); err == nil { | ||
mod.logger.Infow("Module process already running, abandoning restart attempt") | ||
return | ||
// There are two relevant calls that may race with a crashing module: | ||
// 1. mgr.Remove, which wants to stop the module and remove it entirely | ||
// 2. mgr.Reconfigure, which wants to stop the module and replace it with | ||
// a new instance using a different configuration. | ||
// Both lock the manager mutex and then cancel the restart context for the | ||
// module. To avoid racing we lock the mutex and then check if the context | ||
// is cancelled, exiting early if so. If we win the race we may restart the | ||
// module and it will immediately shut down when we release the lock and | ||
// Remove/Reconfigure runs, which is acceptable. | ||
locked := false | ||
lock := func() { | ||
mgr.mu.Lock() | ||
locked = true | ||
} | ||
|
||
if err := mod.sharedConn.Close(); err != nil { | ||
mod.logger.Warnw("Error closing connection to crashed module. Continuing restart attempt", | ||
"error", err) | ||
unlock := func() { | ||
if locked { | ||
mgr.mu.Unlock() | ||
locked = false | ||
} | ||
} | ||
lock() | ||
defer unlock() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, the context can only be canceled by the goroutine holding the manager mutex? I think it'd be slightly better to always follow the Right now it looks like the context check is an optimization to bail and save time. But it's actually a correctness requirement to avoid double-starting. |
||
// Enter a loop trying to restart the module every 5 seconds. If the | ||
// restart succeeds we return, this goroutine ends, and the management | ||
// goroutine started by the new module managedProcess handles any future | ||
// crashes. If the startup fails we kill the new process, its management | ||
// goroutine returns without doing anything, and we continue to loop until | ||
// we succeed or our context is cancelled. | ||
cleanupPerformed := false | ||
for { | ||
if err := ctx.Err(); err != nil { | ||
mod.logger.Infow("Restart context canceled, abandoning restart attempt", "err", err) | ||
return | ||
} | ||
|
||
if mgr.ftdc != nil { | ||
mgr.ftdc.Remove(mod.getFTDCName()) | ||
} | ||
if !cleanupPerformed { | ||
mod.cleanupAfterCrash(mgr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume it's a precondition that I see Given we have the manager mutex (which I assume prevents concurrent "create stuff"), do we need this And if we pulled the I'm just kind of weirded out about this important sounding cleanup method invocation that I'm not particularly familiar with. |
||
cleanupPerformed = true | ||
} | ||
|
||
// If attemptRestart returns any orphaned resource names, restart failed, | ||
// and we should remove orphaned resources. Since we handle process | ||
// restarting ourselves, return false here so goutils knows not to attempt | ||
// a process restart. | ||
if orphanedResourceNames := mgr.attemptRestart(mgr.restartCtx, mod); orphanedResourceNames != nil { | ||
if mgr.removeOrphanedResources != nil { | ||
mgr.removeOrphanedResources(mgr.restartCtx, orphanedResourceNames) | ||
mod.logger.Debugw( | ||
"Removed resources after failed module restart", | ||
"module", mod.cfg.Name, | ||
"resources", resource.NamesToStrings(orphanedResourceNames), | ||
) | ||
err := mgr.attemptRestart(ctx, mod) | ||
if err == nil { | ||
break | ||
} | ||
return | ||
unlock() | ||
utils.SelectContextOrWait(ctx, oueRestartInterval) | ||
lock() | ||
} | ||
mod.logger.Infow("Module successfully restarted, re-adding resources", "module", mod.cfg.Name) | ||
|
||
// Otherwise, add old module process' resources to new module; warn if new | ||
// module cannot handle old resource and remove it from mod.resources. | ||
// Finally, handle orphaned resources. | ||
var orphanedResourceNames []resource.Name | ||
var restoredResourceNamesStr []string | ||
for name, res := range mod.resources { | ||
// The `addResource` method might still be executing for this resource with a | ||
// read lock, so we execute it here with a write lock to make sure it doesn't | ||
// run concurrently. | ||
if _, err := mgr.addResource(mgr.restartCtx, res.conf, res.deps); err != nil { | ||
mod.logger.Warnw("Error while re-adding resource to module", | ||
"resource", name, "module", mod.cfg.Name, "error", err) | ||
mgr.rMap.Delete(name) | ||
|
||
mod.resourcesMu.Lock() | ||
delete(mod.resources, name) | ||
mod.resourcesMu.Unlock() | ||
|
||
confProto, err := config.ComponentConfigToProto(&res.conf) | ||
if err != nil { | ||
orphanedResourceNames = append(orphanedResourceNames, name) | ||
continue | ||
} | ||
_, err = mod.client.AddResource(ctx, &pb.AddResourceRequest{Config: confProto, Dependencies: res.deps}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two questions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not using Not removing resources: the high-level abstract answer is because we're still trying to restart the module, so until a reconfigure removes the module they're not orphaned. More concretely, so that clients who try to use resources on a crashed module won't see "this resource doesn't exist" and instead see "this resource exists but the connection failed", which more closely matches reality. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Adding on to @jmatth 's response Josh came to me with how orphaned resources with respect to the resource graph ought to behave in these module crashing states. And I correlated that to remote resources when connections to remotes go offline (the "will a video stream come back when the network heals" problem). I think my perspective was also the simpler one to code towards. But happy to go over this decision more thoughtfully. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your answers to both my questions make sense to me, so this seems totally fine. I had already assumed based on the code here we were moving toward the "treat crashed modules as downed remotes" strategy, so I'm happy to hear that that was an offline discussion already. |
||
if err != nil { | ||
orphanedResourceNames = append(orphanedResourceNames, name) | ||
continue | ||
} | ||
restoredResourceNamesStr = append(restoredResourceNamesStr, name.String()) | ||
} | ||
if len(orphanedResourceNames) > 0 && mgr.removeOrphanedResources != nil { | ||
orphanedResourceNamesStr := make([]string, len(orphanedResourceNames)) | ||
for _, n := range orphanedResourceNames { | ||
orphanedResourceNamesStr = append(orphanedResourceNamesStr, n.String()) | ||
} | ||
mod.logger.Warnw("Some modules failed to re-add after crashed module restart and will be removed", | ||
jmatth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"module", mod.cfg.Name, | ||
"orphanedResources", orphanedResourceNamesStr) | ||
unlock() | ||
mgr.removeOrphanedResources(mgr.restartCtx, orphanedResourceNames) | ||
} | ||
|
||
mod.logger.Infow("Module resources successfully re-added after module restart", "module", mod.cfg.Name) | ||
mod.logger.Warnw("Module resources successfully re-added after module restart", | ||
"module", mod.cfg.Name, | ||
"resources", restoredResourceNamesStr) | ||
jmatth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
} | ||
|
||
// attemptRestart will attempt to restart the module up to three times and | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No longer true, right? |
||
// return the names of now orphaned resources. | ||
func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.Name { | ||
// deregister crashed module's resources, and let later checkReady reset m.handles | ||
// before reregistering. | ||
mod.deregisterResources() | ||
|
||
var orphanedResourceNames []resource.Name | ||
for name := range mod.resources { | ||
orphanedResourceNames = append(orphanedResourceNames, name) | ||
} | ||
|
||
// Attempt to remove module's .sock file if module did not remove it | ||
// already. | ||
rutils.RemoveFileNoError(mod.addr) | ||
|
||
func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) error { | ||
var success, processRestarted bool | ||
defer func() { | ||
if !success { | ||
|
@@ -985,12 +977,6 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource. | |
} | ||
}() | ||
|
||
if ctx.Err() != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [q] Why remove this early exit from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's superfluous: we already check if the context was cancelled after we take the lock in the OUE handler that's calling this function. |
||
mgr.logger.CInfow( | ||
ctx, "Will not attempt to restart crashed module", "module", mod.cfg.Name, "reason", ctx.Err().Error(), | ||
) | ||
return orphanedResourceNames | ||
} | ||
mgr.logger.CInfow(ctx, "Attempting to restart crashed module", "module", mod.cfg.Name) | ||
|
||
// No need to check mgr.untrustedEnv, as we're restarting the same | ||
|
@@ -1000,47 +986,39 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource. | |
ctx, "Waiting for module to complete restart and re-registration", "module", mod.cfg.Name, mod.logger) | ||
defer cleanup() | ||
|
||
// Attempt to restart module process 3 times. | ||
for attempt := 1; attempt < 4; attempt++ { | ||
if err := mgr.startModuleProcess(mod); err != nil { | ||
mgr.logger.Errorw("Error while restarting crashed module", "restart attempt", | ||
attempt, "module", mod.cfg.Name, "error", err) | ||
if attempt == 3 { | ||
// return early upon last attempt failure. | ||
return orphanedResourceNames | ||
} | ||
} else { | ||
break | ||
blockRestart := make(chan struct{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dgottlieb I know you didn't like this pattern last time so I included an alternate implementation here: 7841737. Personally I prefer this because the code to modify the OUE behavior is in one contiguous block, while the other version involves a branch in a deferred call and manipulating state on the module struct. On the other hand the other implementation builds on the existing context and lock usage rather than introducing another synchronization primitive so 🤷. There's also a third option to rely on each chained OUE handler to perform the next restart attempt rather than looping in the first OUE, but I think the loop is more readable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I understand the logic here, but the pattern that's being used here is a little hard for me to follow. I think what you have is fine, but could you leave a comment above L989 to give a little more context on how this works (blocking restart and returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benjirewis I added a comment explaining the problem this is solving in the most recent commit. To expand on it a bit, consider the following code: func startProc() pexec.ManagedProcess {
cgf := pexec.ProcessConfig{
//...
OnUnexpectedExit: func(_ int) bool {
startProc()
return false
},
}
return exec.NewManagedProcess(cfg, logger)
}
proc := startProc()
// some other code
proc.Stop() If the process crashes before you call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awesome. The explanation here makes sense to me, and the comment you left is great. Thanks! |
||
defer close(blockRestart) | ||
oue := func(exitCode int) bool { | ||
<-blockRestart | ||
if !success { | ||
return false | ||
} | ||
return mgr.newOnUnexpectedExitHandler(ctx, mod)(exitCode) | ||
} | ||
|
||
// Wait with a bit of backoff. Exit early if context has errorred. | ||
if !utils.SelectContextOrWait(ctx, time.Duration(attempt)*oueRestartInterval) { | ||
mgr.logger.CInfow( | ||
ctx, "Will not continue to attempt restarting crashed module", "module", mod.cfg.Name, "reason", ctx.Err().Error(), | ||
) | ||
return orphanedResourceNames | ||
} | ||
if err := mgr.startModuleProcess(mod, oue); err != nil { | ||
mgr.logger.Errorw("Error while restarting crashed module", | ||
"module", mod.cfg.Name, "error", err) | ||
return err | ||
} | ||
processRestarted = true | ||
|
||
if err := mod.dial(); err != nil { | ||
mgr.logger.CErrorw(ctx, "Error while dialing restarted module", | ||
"module", mod.cfg.Name, "error", err) | ||
return orphanedResourceNames | ||
return err | ||
} | ||
|
||
if err := mod.checkReady(ctx, mgr.parentAddr); err != nil { | ||
mgr.logger.CErrorw(ctx, "Error while waiting for restarted module to be ready", | ||
"module", mod.cfg.Name, "error", err) | ||
return orphanedResourceNames | ||
return err | ||
} | ||
|
||
if pc := mod.sharedConn.PeerConn(); mgr.modPeerConnTracker != nil && pc != nil { | ||
mgr.modPeerConnTracker.Add(mod.cfg.Name, pc) | ||
} | ||
|
||
mod.registerResources(mgr) | ||
|
||
success = true | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Could we maybe call this
moduleRestartCtx
? I like the idea of having module-specific restart contexts, but I just want to distinguish it from the manager-widerestartCtx
.