Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Commit

Permalink
Avoid concurrent control actions for a process
Browse files Browse the repository at this point in the history
- api integration test

- ProcessState.Monitor defaults to MONITOR_INIT if Process.MonitorMode == "active"

- control: monitorUnset before calling StopProcess

- control: add monitorActivate helper

- test/helper enhancements

Change-Id: I6ab1d31653c7ae0107edd8de29eb155ae939ad61
  • Loading branch information
dougm committed Nov 14, 2012
1 parent a525a7a commit 67cefe8
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 27 deletions.
5 changes: 4 additions & 1 deletion configmanager.go
Expand Up @@ -175,7 +175,10 @@ func (c *ConfigManager) ApplyDefaultSettings() {
if c.Settings == nil {
c.Settings = &Settings{}
}
settings := c.Settings
c.Settings.ApplyDefaults()
}

func (settings *Settings) ApplyDefaults() {
if settings.AlertTransport == "" {
settings.AlertTransport = DEFAULT_ALERT_TRANSPORT
}
Expand Down
75 changes: 68 additions & 7 deletions control.go
Expand Up @@ -31,6 +31,10 @@ const (
processStarted
)

const (
ERROR_IN_PROGRESS_FMT = "Process %q action already in progress"
)

// So we can mock it in tests.
type EventMonitorInterface interface {
StartMonitoringProcess(process *Process)
Expand Down Expand Up @@ -58,6 +62,9 @@ type ProcessState struct {
Monitor int
MonitorLock sync.Mutex
Starts int

actionPending bool
actionPendingLock sync.Mutex
}

// XXX TODO needed for tests, a form of this should probably be in ConfigManager
Expand Down Expand Up @@ -145,7 +152,12 @@ func (c *Control) State(process *Process) *ProcessState {
}
procName := process.Name
if _, exists := c.States[procName]; !exists {
c.States[procName] = &ProcessState{}
state := &ProcessState{}
c.States[procName] = state
if process.IsMonitoringModeActive() {
state.Monitor = MONITOR_INIT
}

}
return c.States[procName]
}
Expand All @@ -159,18 +171,24 @@ func (c *Control) RegisterEventMonitor(eventMonitor *EventMonitor) {
// Invoke given action for the given process and its
// dependents and/or dependencies
func (c *Control) DoAction(name string, action int) error {
c.visits = make(map[string]*visitor)

process, err := c.Config().FindProcess(name)
if err != nil {
Log.Error(err.Error())
return err
}

return c.invoke(process, func() error {
return c.dispatchAction(process, action)
})
}

func (c *Control) dispatchAction(process *Process, action int) error {
c.visits = make(map[string]*visitor)

switch action {
case ACTION_START:
if process.IsRunning() {
Log.Debugf("Process %q already running", name)
Log.Debugf("Process %q already running", process.Name)
c.monitorSet(process)
return nil
}
Expand Down Expand Up @@ -199,7 +217,7 @@ func (c *Control) DoAction(name string, action int) error {
c.doUnmonitor(process)

default:
Log.Errorf("process %q -- invalid action: %d",
err := fmt.Errorf("process %q -- invalid action: %d",
process.Name, action)
return err
}
Expand All @@ -209,6 +227,17 @@ func (c *Control) DoAction(name string, action int) error {
return nil
}

// do not allow more than one control action per process at the same time
func (c *Control) invoke(process *Process, action func() error) error {
if c.isActionPending(process) {
return fmt.Errorf(ERROR_IN_PROGRESS_FMT, process.Name)
}
c.setActionPending(process, true)
defer c.setActionPending(process, false)

return action()
}

// Start the given Process dependencies before starting Process
func (c *Control) doStart(process *Process) {
visitor := c.visitorOf(process)
Expand Down Expand Up @@ -244,15 +273,15 @@ func (c *Control) doStop(process *Process) bool {
}
visitor.stopped = true

c.monitorUnset(process)

if process.IsRunning() {
process.StopProcess()
if process.waitState(processStopped) != processStopped {
rv = false
}
}

c.monitorUnset(process)

return rv
}

Expand Down Expand Up @@ -322,6 +351,24 @@ func (c *Control) monitorSet(process *Process) {
}
}

// for use by process watcher
func (c *Control) monitorActivate(process *Process) bool {
state := c.State(process)
state.MonitorLock.Lock()
defer state.MonitorLock.Unlock()

if state.Monitor == MONITOR_NOT {
return false
}

if state.Monitor != MONITOR_YES {
state.Monitor = MONITOR_YES // INIT -> YES
Log.Infof("%q monitoring activated", process.Name)
}

return true
}

func (c *Control) monitorUnset(process *Process) {
state := c.State(process)
state.MonitorLock.Lock()
Expand All @@ -332,6 +379,20 @@ func (c *Control) monitorUnset(process *Process) {
}
}

func (c *Control) isActionPending(process *Process) bool {
state := c.State(process)
state.actionPendingLock.Lock()
defer state.actionPendingLock.Unlock()
return state.actionPending
}

func (c *Control) setActionPending(process *Process, actionPending bool) {
state := c.State(process)
state.actionPendingLock.Lock()
defer state.actionPendingLock.Unlock()
state.actionPending = actionPending
}

func (c *Control) IsMonitoring(process *Process) bool {
state := c.State(process)
state.MonitorLock.Lock()
Expand Down
66 changes: 51 additions & 15 deletions test/helper/helper.go
Expand Up @@ -41,7 +41,7 @@ type ProcessInfo struct {
HasTty bool
}

var TestProcess, goprocess string
var TestProcess, goprocess, toplevel string
var MAX_GONIT_RETRIES int = 10

func CurrentProcessInfo() *ProcessInfo {
Expand Down Expand Up @@ -242,6 +242,20 @@ func NewTestProcess(name string, flags []string, detached bool) *Process {
}
}

func CreateProcessGroupCfg(name string, dir string, pg *ProcessGroup) error {
yaml, err := goyaml.Marshal(pg)
if err != nil {
return err
}

file := filepath.Join(dir, name+"-gonit.yml")
if err := ioutil.WriteFile(file, yaml, 0666); err != nil {
return err
}

return nil
}

func CreateGonitCfg(numProcesses int, pname string, writePath string,
procPath string, includeEvents bool) error {
pg := &ProcessGroup{}
Expand Down Expand Up @@ -288,46 +302,68 @@ func CreateGonitCfg(numProcesses int, pname string, writePath string,
processes[procName] = process
}
pg.Processes = processes
if yaml, err := goyaml.Marshal(pg); err != nil {
return err
} else {
gonitCfgPath := fmt.Sprintf("%v/%v-gonit.yml", writePath, pname)
if err := ioutil.WriteFile(gonitCfgPath, yaml, 0666); err != nil {
return err
}
}
return nil
return CreateProcessGroupCfg(pname, writePath, pg)
}

func CreateGonitSettings(gonitPidfile string, gonitDir string, procDir string) {
logging := &LoggerConfig{Codec: "json"}
func CreateGonitSettings(gonitPidfile string, gonitDir string, procDir string) *Settings {
logging := &LoggerConfig{
Codec: "json",
Level: "debug",
}
settings := &Settings{Logging: logging}
daemon := &Process{
Pidfile: gonitPidfile,
Dir: gonitDir,
Name: "gonit",
}
settings.Daemon = daemon
settings.ApplyDefaults()
yaml, _ := goyaml.Marshal(settings)
err := ioutil.WriteFile(procDir+"/gonit.yml", yaml, 0666)
if err != nil {
log.Fatalf("WriteFile(%s): %v", procDir+"/gonit.yml", err)
}
return settings
}

// Read pid from a file.
func ProxyReadPidFile(path string) (int, error) {
return ReadPidFile(path)
}

func findPath(path string, name string) (string, error) {
if _, err := os.Stat(path); err != nil {
return "", err
}
if _, err := os.Stat(filepath.Join(path, name)); err == nil {
return path, nil
}
return findPath(filepath.Join(path, ".."), name)
}

func topLevel() string {
if toplevel == "" {
dir, err := findPath(".", ".git")
if err != nil {
log.Fatal(err)
}
toplevel = dir
}
return toplevel
}

// Given the path to a direcotry to build and given an optional output path,
// this will build the binary.
func BuildBin(path string, outputPath string) error {
log.Printf("Building '%v'", path)
var output []byte
var err error
output, err = exec.Command("go", "build", "-o", outputPath,
path+"/main.go").Output()
if !filepath.IsAbs(path) {
path = filepath.Join(topLevel(), path)
}
path = filepath.Join(path, "main.go")
log.Printf("Building '%v'", path)
output, err =
exec.Command("go", "build", "-o", outputPath, path).Output()
if err != nil {
return fmt.Errorf("Error building bin '%v': %v", path, string(output))
}
Expand Down

0 comments on commit 67cefe8

Please sign in to comment.