@@ -323,11 +323,11 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo
323
323
return nil
324
324
}
325
325
326
- func (mgr * Manager ) startModuleProcess (mod * module ) error {
326
+ func (mgr * Manager ) startModuleProcess (mod * module , oue pexec. UnexpectedExitHandler ) error {
327
327
return mod .startProcess (
328
328
mgr .restartCtx ,
329
329
mgr .parentAddr ,
330
- mgr . newOnUnexpectedExitHandler ( mod ) ,
330
+ oue ,
331
331
mgr .viamHomeDir ,
332
332
mgr .packagesDir ,
333
333
)
@@ -353,7 +353,9 @@ func (mgr *Manager) startModule(ctx context.Context, mod *module) error {
353
353
ctx , "Waiting for module to complete startup and registration" , "module" , mod .cfg .Name , mod .logger )
354
354
defer cleanup ()
355
355
356
- if err := mgr .startModuleProcess (mod ); err != nil {
356
+ var moduleRestartCtx context.Context
357
+ moduleRestartCtx , mod .restartCancel = context .WithCancel (mgr .restartCtx )
358
+ if err := mgr .startModuleProcess (mod , mgr .newOnUnexpectedExitHandler (moduleRestartCtx , mod )); err != nil {
357
359
return errors .WithMessage (err , "error while starting module " + mod .cfg .Name )
358
360
}
359
361
@@ -441,9 +443,8 @@ func (mgr *Manager) Remove(modName string) ([]resource.Name, error) {
441
443
442
444
handledResources := mod .resources
443
445
444
- // Always mark pendingRemoval even if there is no more work to do because the
445
- // restart handler checks it to avoid restarting a removed module.
446
446
mod .pendingRemoval = true
447
+ mod .restartCancel ()
447
448
448
449
// If module handles no resources, remove it now.
449
450
if len (handledResources ) == 0 {
@@ -494,12 +495,11 @@ func (mgr *Manager) closeModule(mod *module, reconfigure bool) error {
494
495
495
496
mod .deregisterResources ()
496
497
497
- mgr . rMap . Range ( func ( r resource. Name , m * module ) bool {
498
+ for r , m := range mgr . rMap . Range {
498
499
if m == mod {
499
500
mgr .rMap .Delete (r )
500
501
}
501
- return true
502
- })
502
+ }
503
503
mgr .modules .Delete (mod .cfg .Name )
504
504
505
505
mod .logger .Infow ("Module successfully closed" , "module" , mod .cfg .Name )
@@ -869,109 +869,107 @@ var oueRestartInterval = 5 * time.Second
869
869
870
870
// newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function
871
871
// for the passed-in module to include in the pexec.ProcessConfig.
872
- func (mgr * Manager ) newOnUnexpectedExitHandler (mod * module ) pexec.UnexpectedExitHandler {
872
+ func (mgr * Manager ) newOnUnexpectedExitHandler (ctx context. Context , mod * module ) pexec.UnexpectedExitHandler {
873
873
return func (exitCode int ) (continueAttemptingRestart bool ) {
874
874
// Log error immediately, as this is unexpected behavior.
875
875
mod .logger .Errorw (
876
876
"Module has unexpectedly exited." , "module" , mod .cfg .Name , "exit_code" , exitCode ,
877
877
)
878
878
879
- // Note that a module process can be restarted while preserving the same `module` object. And consider the case where a module is being
880
- // restarted due to a configuration change concurrently with the existing module process crashing. There are two acceptable
881
- // interleavings:
882
- // 1. The `onUnexpectedExitHandler` restarts the module process with the old configuration.
883
- // 1a) and the Reconfigure then shuts down + restarts the (freshly launched) module process with one using the updated configuration.
884
- // 2. Or, the `Reconfigure` executes and starts the module process with the updated config. The `onUnexpectedExitHandler` will still
885
- // run. But will become a no-op.
886
- //
887
- // For the second scenario, we check our assumptions after acquiring the modmanager mutex. If the module process is running, there is
888
- // nothing for us to do.
889
- mgr .mu .Lock ()
890
- defer mgr .mu .Unlock ()
891
-
892
- if mod .pendingRemoval {
893
- mod .logger .Infow ("Module marked for removal, abandoning restart attempt" )
894
- return
895
- }
896
-
897
- // Something else already started a new process while we were waiting on the
898
- // lock, so no restart is needed.
899
- if err := mod .process .Status (); err == nil {
900
- mod .logger .Infow ("Module process already running, abandoning restart attempt" )
901
- return
879
+ // There are two relevant calls that may race with a crashing module:
880
+ // 1. mgr.Remove, which wants to stop the module and remove it entirely
881
+ // 2. mgr.Reconfigure, which wants to stop the module and replace it with
882
+ // a new instance using a different configuration.
883
+ // Both lock the manager mutex and then cancel the restart context for the
884
+ // module. To avoid racing we lock the mutex and then check if the context
885
+ // is cancelled, exiting early if so. If we win the race we may restart the
886
+ // module and it will immediately shut down when we release the lock and
887
+ // Remove/Reconfigure runs, which is acceptable.
888
+ locked := false
889
+ lock := func () {
890
+ if ! locked {
891
+ mgr .mu .Lock ()
892
+ locked = true
893
+ }
902
894
}
903
-
904
- if err := mod .sharedConn .Close (); err != nil {
905
- mod .logger .Warnw ("Error closing connection to crashed module. Continuing restart attempt" ,
906
- "error" , err )
895
+ unlock := func () {
896
+ if locked {
897
+ mgr .mu .Unlock ()
898
+ locked = false
899
+ }
907
900
}
901
+ defer unlock ()
902
+
903
+ // Enter a loop trying to restart the module every 5 seconds. If the
904
+ // restart succeeds we return, this goroutine ends, and the management
905
+ // goroutine started by the new module managedProcess handles any future
906
+ // crashes. If the startup fails we kill the new process, its management
907
+ // goroutine returns without doing anything, and we continue to loop until
908
+ // we succeed or our context is cancelled.
909
+ cleanupPerformed := false
910
+ for {
911
+ lock ()
912
+ // It's possible the module has been removed or replaced while we were
913
+ // waiting on the lock. Check for a context cancellation to avoid double
914
+ // starting and/or leaking a module process.
915
+ if err := ctx .Err (); err != nil {
916
+ mod .logger .Infow ("Restart context canceled, abandoning restart attempt" , "err" , err )
917
+ return
918
+ }
908
919
909
- if mgr .ftdc != nil {
910
- mgr .ftdc .Remove (mod .getFTDCName ())
911
- }
920
+ if ! cleanupPerformed {
921
+ mod .cleanupAfterCrash (mgr )
922
+ cleanupPerformed = true
923
+ }
912
924
913
- // If attemptRestart returns any orphaned resource names, restart failed,
914
- // and we should remove orphaned resources. Since we handle process
915
- // restarting ourselves, return false here so goutils knows not to attempt
916
- // a process restart.
917
- if orphanedResourceNames := mgr .attemptRestart (mgr .restartCtx , mod ); orphanedResourceNames != nil {
918
- if mgr .removeOrphanedResources != nil {
919
- mgr .removeOrphanedResources (mgr .restartCtx , orphanedResourceNames )
920
- mod .logger .Debugw (
921
- "Removed resources after failed module restart" ,
922
- "module" , mod .cfg .Name ,
923
- "resources" , resource .NamesToStrings (orphanedResourceNames ),
924
- )
925
+ err := mgr .attemptRestart (ctx , mod )
926
+ if err == nil {
927
+ break
925
928
}
926
- return
929
+ unlock ()
930
+ utils .SelectContextOrWait (ctx , oueRestartInterval )
927
931
}
928
932
mod .logger .Infow ("Module successfully restarted, re-adding resources" , "module" , mod .cfg .Name )
929
933
930
- // Otherwise, add old module process' resources to new module; warn if new
931
- // module cannot handle old resource and remove it from mod.resources.
932
- // Finally, handle orphaned resources.
933
934
var orphanedResourceNames []resource.Name
935
+ var restoredResourceNamesStr []string
934
936
for name , res := range mod .resources {
935
- // The `addResource` method might still be executing for this resource with a
936
- // read lock, so we execute it here with a write lock to make sure it doesn't
937
- // run concurrently.
938
- if _ , err := mgr .addResource (mgr .restartCtx , res .conf , res .deps ); err != nil {
939
- mod .logger .Warnw ("Error while re-adding resource to module" ,
940
- "resource" , name , "module" , mod .cfg .Name , "error" , err )
941
- mgr .rMap .Delete (name )
942
-
943
- mod .resourcesMu .Lock ()
944
- delete (mod .resources , name )
945
- mod .resourcesMu .Unlock ()
946
-
937
+ confProto , err := config .ComponentConfigToProto (& res .conf )
938
+ if err != nil {
947
939
orphanedResourceNames = append (orphanedResourceNames , name )
940
+ continue
948
941
}
942
+ _ , err = mod .client .AddResource (ctx , & pb.AddResourceRequest {Config : confProto , Dependencies : res .deps })
943
+ if err != nil {
944
+ orphanedResourceNames = append (orphanedResourceNames , name )
945
+ continue
946
+ }
947
+ restoredResourceNamesStr = append (restoredResourceNamesStr , name .String ())
949
948
}
950
949
if len (orphanedResourceNames ) > 0 && mgr .removeOrphanedResources != nil {
950
+ orphanedResourceNamesStr := make ([]string , len (orphanedResourceNames ))
951
+ for _ , n := range orphanedResourceNames {
952
+ orphanedResourceNamesStr = append (orphanedResourceNamesStr , n .String ())
953
+ }
954
+ mod .logger .Warnw ("Some modules failed to re-add after crashed module restart and will be removed" ,
955
+ "module" , mod .cfg .Name ,
956
+ "orphanedResources" , orphanedResourceNamesStr )
957
+ unlock ()
951
958
mgr .removeOrphanedResources (mgr .restartCtx , orphanedResourceNames )
952
959
}
953
960
954
- mod .logger .Infow ("Module resources successfully re-added after module restart" , "module" , mod .cfg .Name )
961
+ mod .logger .Infow ("Module resources successfully re-added after module restart" ,
962
+ "module" , mod .cfg .Name ,
963
+ "resources" , restoredResourceNamesStr )
955
964
return
956
965
}
957
966
}
958
967
959
- // attemptRestart will attempt to restart the module up to three times and
960
- // return the names of now orphaned resources.
961
- func (mgr * Manager ) attemptRestart (ctx context.Context , mod * module ) []resource.Name {
962
- // deregister crashed module's resources, and let later checkReady reset m.handles
963
- // before reregistering.
964
- mod .deregisterResources ()
965
-
966
- var orphanedResourceNames []resource.Name
967
- for name := range mod .resources {
968
- orphanedResourceNames = append (orphanedResourceNames , name )
969
- }
970
-
971
- // Attempt to remove module's .sock file if module did not remove it
972
- // already.
973
- rutils .RemoveFileNoError (mod .addr )
974
-
968
+ // attemptRestart will attempt to restart the module process. It returns nil
969
+ // on success and an error in case of failure. In the failure case it ensures
970
+ // that the failed process is killed and will not be restarted by pexec or an
971
+ // OUE handler.
972
+ func (mgr * Manager ) attemptRestart (ctx context.Context , mod * module ) error {
975
973
var success , processRestarted bool
976
974
defer func () {
977
975
if ! success {
@@ -985,12 +983,6 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
985
983
}
986
984
}()
987
985
988
- if ctx .Err () != nil {
989
- mgr .logger .CInfow (
990
- ctx , "Will not attempt to restart crashed module" , "module" , mod .cfg .Name , "reason" , ctx .Err ().Error (),
991
- )
992
- return orphanedResourceNames
993
- }
994
986
mgr .logger .CInfow (ctx , "Attempting to restart crashed module" , "module" , mod .cfg .Name )
995
987
996
988
// No need to check mgr.untrustedEnv, as we're restarting the same
@@ -1000,47 +992,46 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
1000
992
ctx , "Waiting for module to complete restart and re-registration" , "module" , mod .cfg .Name , mod .logger )
1001
993
defer cleanup ()
1002
994
1003
- // Attempt to restart module process 3 times.
1004
- for attempt := 1 ; attempt < 4 ; attempt ++ {
1005
- if err := mgr .startModuleProcess (mod ); err != nil {
1006
- mgr .logger .Errorw ("Error while restarting crashed module" , "restart attempt" ,
1007
- attempt , "module" , mod .cfg .Name , "error" , err )
1008
- if attempt == 3 {
1009
- // return early upon last attempt failure.
1010
- return orphanedResourceNames
1011
- }
1012
- } else {
1013
- break
995
+ // There is a potential race here where the process starts but then crashes,
996
+ // causing its OUE handler to spawn another restart attempt even though we've
997
+ // determined the startup to be a failure and want the new process to stay
998
+ // dead. To prevent this we wrap the new OUE handler in another function and
999
+ // block its execution until we have determined startup success or failure,
1000
+ // at which point it exits early w/o attempting a restart on failure or
1001
+ // continues with the normal OUE execution on success.
1002
+ blockRestart := make (chan struct {})
1003
+ defer close (blockRestart )
1004
+ oue := func (exitCode int ) bool {
1005
+ <- blockRestart
1006
+ if ! success {
1007
+ return false
1014
1008
}
1009
+ return mgr .newOnUnexpectedExitHandler (ctx , mod )(exitCode )
1010
+ }
1015
1011
1016
- // Wait with a bit of backoff. Exit early if context has errorred.
1017
- if ! utils .SelectContextOrWait (ctx , time .Duration (attempt )* oueRestartInterval ) {
1018
- mgr .logger .CInfow (
1019
- ctx , "Will not continue to attempt restarting crashed module" , "module" , mod .cfg .Name , "reason" , ctx .Err ().Error (),
1020
- )
1021
- return orphanedResourceNames
1022
- }
1012
+ if err := mgr .startModuleProcess (mod , oue ); err != nil {
1013
+ mgr .logger .Errorw ("Error while restarting crashed module" ,
1014
+ "module" , mod .cfg .Name , "error" , err )
1015
+ return err
1023
1016
}
1024
1017
processRestarted = true
1025
1018
1026
1019
if err := mod .dial (); err != nil {
1027
1020
mgr .logger .CErrorw (ctx , "Error while dialing restarted module" ,
1028
1021
"module" , mod .cfg .Name , "error" , err )
1029
- return orphanedResourceNames
1022
+ return err
1030
1023
}
1031
1024
1032
1025
if err := mod .checkReady (ctx , mgr .parentAddr ); err != nil {
1033
1026
mgr .logger .CErrorw (ctx , "Error while waiting for restarted module to be ready" ,
1034
1027
"module" , mod .cfg .Name , "error" , err )
1035
- return orphanedResourceNames
1028
+ return err
1036
1029
}
1037
1030
1038
1031
if pc := mod .sharedConn .PeerConn (); mgr .modPeerConnTracker != nil && pc != nil {
1039
1032
mgr .modPeerConnTracker .Add (mod .cfg .Name , pc )
1040
1033
}
1041
1034
1042
- mod .registerResources (mgr )
1043
-
1044
1035
success = true
1045
1036
return nil
1046
1037
}
0 commit comments