Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge "Avoid concurrent control actions for a process"

  • Loading branch information...
commit e926d4a015fd80f0b76cd244c03d1f1cf21d3db1 2 parents 37452a7 + 67cefe8
@dougm dougm authored Gerrit Code Review committed
View
5 configmanager.go
@@ -175,7 +175,10 @@ func (c *ConfigManager) ApplyDefaultSettings() {
if c.Settings == nil {
c.Settings = &Settings{}
}
- settings := c.Settings
+ c.Settings.ApplyDefaults()
+}
+
+func (settings *Settings) ApplyDefaults() {
if settings.AlertTransport == "" {
settings.AlertTransport = DEFAULT_ALERT_TRANSPORT
}
View
75 control.go
@@ -31,6 +31,10 @@ const (
processStarted
)
+const (
+ ERROR_IN_PROGRESS_FMT = "Process %q action already in progress"
+)
+
// So we can mock it in tests.
type EventMonitorInterface interface {
StartMonitoringProcess(process *Process)
@@ -58,6 +62,9 @@ type ProcessState struct {
Monitor int
MonitorLock sync.Mutex
Starts int
+
+ actionPending bool
+ actionPendingLock sync.Mutex
}
// XXX TODO needed for tests, a form of this should probably be in ConfigManager
@@ -145,7 +152,12 @@ func (c *Control) State(process *Process) *ProcessState {
}
procName := process.Name
if _, exists := c.States[procName]; !exists {
- c.States[procName] = &ProcessState{}
+ state := &ProcessState{}
+ c.States[procName] = state
+ if process.IsMonitoringModeActive() {
+ state.Monitor = MONITOR_INIT
+ }
+
}
return c.States[procName]
}
@@ -159,18 +171,24 @@ func (c *Control) RegisterEventMonitor(eventMonitor *EventMonitor) {
// Invoke given action for the given process and its
// dependents and/or dependencies
func (c *Control) DoAction(name string, action int) error {
- c.visits = make(map[string]*visitor)
-
process, err := c.Config().FindProcess(name)
if err != nil {
Log.Error(err.Error())
return err
}
+ return c.invoke(process, func() error {
+ return c.dispatchAction(process, action)
+ })
+}
+
+func (c *Control) dispatchAction(process *Process, action int) error {
+ c.visits = make(map[string]*visitor)
+
switch action {
case ACTION_START:
if process.IsRunning() {
- Log.Debugf("Process %q already running", name)
+ Log.Debugf("Process %q already running", process.Name)
c.monitorSet(process)
return nil
}
@@ -199,7 +217,7 @@ func (c *Control) DoAction(name string, action int) error {
c.doUnmonitor(process)
default:
- Log.Errorf("process %q -- invalid action: %d",
+ err := fmt.Errorf("process %q -- invalid action: %d",
process.Name, action)
return err
}
@@ -209,6 +227,17 @@ func (c *Control) DoAction(name string, action int) error {
return nil
}
+// do not allow more than one control action per process at the same time
+func (c *Control) invoke(process *Process, action func() error) error {
+ if c.isActionPending(process) {
+ return fmt.Errorf(ERROR_IN_PROGRESS_FMT, process.Name)
+ }
+ c.setActionPending(process, true)
+ defer c.setActionPending(process, false)
+
+ return action()
+}
+
// Start the given Process dependencies before starting Process
func (c *Control) doStart(process *Process) {
visitor := c.visitorOf(process)
@@ -244,6 +273,8 @@ func (c *Control) doStop(process *Process) bool {
}
visitor.stopped = true
+ c.monitorUnset(process)
+
if process.IsRunning() {
process.StopProcess()
if process.waitState(processStopped) != processStopped {
@@ -251,8 +282,6 @@ func (c *Control) doStop(process *Process) bool {
}
}
- c.monitorUnset(process)
-
return rv
}
@@ -322,6 +351,24 @@ func (c *Control) monitorSet(process *Process) {
}
}
+// for use by process watcher
+func (c *Control) monitorActivate(process *Process) bool {
+ state := c.State(process)
+ state.MonitorLock.Lock()
+ defer state.MonitorLock.Unlock()
+
+ if state.Monitor == MONITOR_NOT {
+ return false
+ }
+
+ if state.Monitor != MONITOR_YES {
+ state.Monitor = MONITOR_YES // INIT -> YES
+ Log.Infof("%q monitoring activated", process.Name)
+ }
+
+ return true
+}
+
func (c *Control) monitorUnset(process *Process) {
state := c.State(process)
state.MonitorLock.Lock()
@@ -332,6 +379,20 @@ func (c *Control) monitorUnset(process *Process) {
}
}
+func (c *Control) isActionPending(process *Process) bool {
+ state := c.State(process)
+ state.actionPendingLock.Lock()
+ defer state.actionPendingLock.Unlock()
+ return state.actionPending
+}
+
+func (c *Control) setActionPending(process *Process, actionPending bool) {
+ state := c.State(process)
+ state.actionPendingLock.Lock()
+ defer state.actionPendingLock.Unlock()
+ state.actionPending = actionPending
+}
+
func (c *Control) IsMonitoring(process *Process) bool {
state := c.State(process)
state.MonitorLock.Lock()
View
66 test/helper/helper.go
@@ -41,7 +41,7 @@ type ProcessInfo struct {
HasTty bool
}
-var TestProcess, goprocess string
+var TestProcess, goprocess, toplevel string
var MAX_GONIT_RETRIES int = 10
func CurrentProcessInfo() *ProcessInfo {
@@ -242,6 +242,20 @@ func NewTestProcess(name string, flags []string, detached bool) *Process {
}
}
+func CreateProcessGroupCfg(name string, dir string, pg *ProcessGroup) error {
+ yaml, err := goyaml.Marshal(pg)
+ if err != nil {
+ return err
+ }
+
+ file := filepath.Join(dir, name+"-gonit.yml")
+ if err := ioutil.WriteFile(file, yaml, 0666); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func CreateGonitCfg(numProcesses int, pname string, writePath string,
procPath string, includeEvents bool) error {
pg := &ProcessGroup{}
@@ -288,19 +302,14 @@ func CreateGonitCfg(numProcesses int, pname string, writePath string,
processes[procName] = process
}
pg.Processes = processes
- if yaml, err := goyaml.Marshal(pg); err != nil {
- return err
- } else {
- gonitCfgPath := fmt.Sprintf("%v/%v-gonit.yml", writePath, pname)
- if err := ioutil.WriteFile(gonitCfgPath, yaml, 0666); err != nil {
- return err
- }
- }
- return nil
+ return CreateProcessGroupCfg(pname, writePath, pg)
}
-func CreateGonitSettings(gonitPidfile string, gonitDir string, procDir string) {
- logging := &LoggerConfig{Codec: "json"}
+func CreateGonitSettings(gonitPidfile string, gonitDir string, procDir string) *Settings {
+ logging := &LoggerConfig{
+ Codec: "json",
+ Level: "debug",
+ }
settings := &Settings{Logging: logging}
daemon := &Process{
Pidfile: gonitPidfile,
@@ -308,11 +317,13 @@ func CreateGonitSettings(gonitPidfile string, gonitDir string, procDir string) {
Name: "gonit",
}
settings.Daemon = daemon
+ settings.ApplyDefaults()
yaml, _ := goyaml.Marshal(settings)
err := ioutil.WriteFile(procDir+"/gonit.yml", yaml, 0666)
if err != nil {
log.Fatalf("WriteFile(%s): %v", procDir+"/gonit.yml", err)
}
+ return settings
}
// Read pid from a file.
@@ -320,14 +331,39 @@ func ProxyReadPidFile(path string) (int, error) {
return ReadPidFile(path)
}
+func findPath(path string, name string) (string, error) {
+ if _, err := os.Stat(path); err != nil {
+ return "", err
+ }
+ if _, err := os.Stat(filepath.Join(path, name)); err == nil {
+ return path, nil
+ }
+ return findPath(filepath.Join(path, ".."), name)
+}
+
+func topLevel() string {
+ if toplevel == "" {
+ dir, err := findPath(".", ".git")
+ if err != nil {
+ log.Fatal(err)
+ }
+ toplevel = dir
+ }
+ return toplevel
+}
+
// Given the path to a direcotry to build and given an optional output path,
// this will build the binary.
func BuildBin(path string, outputPath string) error {
- log.Printf("Building '%v'", path)
var output []byte
var err error
- output, err = exec.Command("go", "build", "-o", outputPath,
- path+"/main.go").Output()
+ if !filepath.IsAbs(path) {
+ path = filepath.Join(topLevel(), path)
+ }
+ path = filepath.Join(path, "main.go")
+ log.Printf("Building '%v'", path)
+ output, err =
+ exec.Command("go", "build", "-o", outputPath, path).Output()
if err != nil {
return fmt.Errorf("Error building bin '%v': %v", path, string(output))
}
View
163 test/integration/api_test.go
@@ -0,0 +1,163 @@
+// Copyright (c) 2012 VMware, Inc.
+
+package gonit_integration
+
+import (
+ "fmt"
+ "github.com/cloudfoundry/gonit"
+ "github.com/cloudfoundry/gonit/test/helper"
+ . "launchpad.net/gocheck"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "os/exec"
+ "time"
+)
+
+type ApiIntSuite struct {
+ gonitCmd *exec.Cmd
+ dir string
+ procs map[string]*gonit.Process
+ settings *gonit.Settings
+ client *rpc.Client
+}
+
+const (
+ group = "api_test"
+ errorInProgressFmt = "ActionError: " + gonit.ERROR_IN_PROGRESS_FMT
+)
+
+var _ = Suite(&ApiIntSuite{})
+
+func (s *ApiIntSuite) addProcess(name string, flags []string) *gonit.Process {
+ process := helper.NewTestProcess(name, flags, true)
+ process.Description = name
+ process.MonitorMode = gonit.MONITOR_MODE_MANUAL
+ s.procs[process.Name] = process
+ return process
+}
+
+func (s *ApiIntSuite) SetUpSuite(c *C) {
+ s.procs = make(map[string]*gonit.Process)
+ s.addProcess("sleepy", []string{"-s", "1h"})
+ s.addProcess("dopey", []string{"-w", "5s", "-s", "1h"})
+ s.addProcess("grumpy", []string{"-x", "1", "-s", "1s"})
+
+ s.dir = c.MkDir()
+
+ s.settings = helper.CreateGonitSettings("", s.dir, s.dir)
+
+ helper.CreateProcessGroupCfg(group, s.dir,
+ &gonit.ProcessGroup{Processes: s.procs})
+
+ var err error
+ s.gonitCmd, _, err = helper.StartGonit(s.dir)
+ if err != nil {
+ c.Errorf(err.Error())
+ }
+
+ s.client, err = jsonrpc.Dial("unix", s.settings.RpcServerUrl)
+ if err != nil {
+ c.Errorf("rpc.Dial: %v", err)
+ }
+
+ pgs, err := s.statusGroup(group)
+ c.Assert(err, IsNil)
+ c.Assert(pgs.Group, HasLen, len(s.procs))
+ for _, ps := range pgs.Group {
+ c.Assert(ps.Summary.Running, Equals, false)
+ }
+}
+
+func (s *ApiIntSuite) TearDownSuite(c *C) {
+ s.client.Close()
+
+ if err := helper.StopGonit(s.gonitCmd, s.dir); err != nil {
+ c.Errorf(err.Error())
+ }
+
+ for _, process := range s.procs {
+ helper.Cleanup(process)
+ }
+}
+
+func (s *ApiIntSuite) TestControl(c *C) {
+ dopey := s.procs["dopey"]
+ grumpy := s.procs["grumpy"]
+ sleepy := s.procs["sleepy"]
+
+ result, err := s.startProcess(sleepy)
+ c.Check(err, IsNil)
+ c.Check(result.Total, Equals, 1)
+ c.Check(result.Errors, Equals, 0)
+
+ done := make(chan error)
+ go func() {
+ // takes a while to write pid file
+ _, err := s.startProcess(dopey)
+ done <- err
+ }()
+
+ // make sure above StartProcess is in action
+ time.Sleep(2 * time.Second)
+
+ // test we can get status while control action is running
+ status, err := s.statusProcess(dopey)
+ if c.Check(err, IsNil) {
+ c.Check(status.Summary.Running, Equals, false)
+ }
+
+ // get status for another process should be fine too
+ status, err = s.statusProcess(grumpy)
+ if c.Check(err, IsNil) {
+ c.Check(status.Summary.Running, Equals, false)
+ }
+
+ // control action in already progress; should fail
+ _, err = s.stopProcess(dopey)
+ msg := fmt.Sprintf(errorInProgressFmt, dopey.Name)
+ if c.Check(err, NotNil) {
+ c.Check(err.Error(), Equals, msg)
+ }
+
+ // but can control another process
+ _, err = s.startProcess(grumpy)
+ c.Check(err, IsNil)
+
+ err = <-done // waiting for dopey to start
+ c.Check(err, IsNil)
+
+ status, err = s.statusProcess(dopey)
+ if c.Check(err, IsNil) {
+ c.Check(status.Summary.Name, Equals, dopey.Name)
+ c.Check(status.Summary.Running, Equals, true)
+ }
+
+ status, err = s.statusProcess(sleepy)
+ if c.Check(err, IsNil) {
+ c.Check(status.Summary.Running, Equals, true)
+ }
+}
+
+func (s *ApiIntSuite) statusProcess(p *gonit.Process) (*gonit.ProcessStatus, error) {
+ status := &gonit.ProcessStatus{}
+ err := s.client.Call("API.StatusProcess", p.Name, status)
+ return status, err
+}
+
+func (s *ApiIntSuite) startProcess(p *gonit.Process) (*gonit.ActionResult, error) {
+ result := &gonit.ActionResult{}
+ err := s.client.Call("API.StartProcess", p.Name, result)
+ return result, err
+}
+
+func (s *ApiIntSuite) stopProcess(p *gonit.Process) (*gonit.ActionResult, error) {
+ result := &gonit.ActionResult{}
+ err := s.client.Call("API.StopProcess", p.Name, result)
+ return result, err
+}
+
+func (s *ApiIntSuite) statusGroup(group string) (*gonit.ProcessGroupStatus, error) {
+ pgs := &gonit.ProcessGroupStatus{}
+ err := s.client.Call("API.StatusGroup", group, pgs)
+ return pgs, err
+}
View
27 test/process/main.go
@@ -26,6 +26,7 @@ var (
name = flag.String("n", "test", "process name")
pidfile = flag.String("p", "test.pid", "process pid file")
sleep = flag.String("s", "10s", "sleep duration")
+ wait = flag.String("w", "", "start/stop wait duration")
exit = flag.Int("x", 0, "exit code")
)
@@ -100,14 +101,23 @@ func balloon() {
}
}
+func sleepDuration(name string, value string) time.Duration {
+ if value == "" {
+ return time.Duration(0)
+ }
+ duration, err := time.ParseDuration(value)
+ if err != nil {
+ log.Fatalf("Invalid %s '%s': %v", name, value, err)
+ }
+ return duration
+}
+
func main() {
flag.Parse()
log.SetFlags(log.Ltime | log.Lshortfile)
- pause, err := time.ParseDuration(*sleep)
- if err != nil {
- log.Fatalf("Invalid -s '%s': %v", *sleep, err)
- }
+ pause := sleepDuration("-s", *sleep)
+ waitTime := sleepDuration("-w", *wait)
cmd := flag.Args()[0]
switch cmd {
@@ -116,6 +126,10 @@ func main() {
forkme()
}
if *grand {
+ if *wait != "" {
+ fmt.Fprintf(os.Stdout, "Start (savePid) wait=%s\n", *wait)
+ time.Sleep(waitTime)
+ }
savePid()
go handleSignals()
}
@@ -131,10 +145,15 @@ func main() {
fmt.Fprintf(os.Stdout, "Stopped. [exit(%d)]\n", *exit)
os.Exit(*exit)
case "stop":
+ if *wait != "" {
+ fmt.Fprintf(os.Stdout, "Stop wait=%s\n", *wait)
+ time.Sleep(waitTime)
+ }
pid, err := gonit.ReadPidFile(*pidfile)
if err != nil {
log.Fatal(err)
}
+ fmt.Fprintf(os.Stdout, "Sending SIGTERM to pid=%d\n", pid)
err = syscall.Kill(pid, syscall.SIGTERM)
if err != nil {
log.Fatal(err)
Please sign in to comment.
Something went wrong with that request. Please try again.