Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Commit

Permalink
Fix group/all restarts with process dependencies
Browse files Browse the repository at this point in the history
- widen the scope of visitor table so processes dependencies are not
  restarted more than once during a group control action

- visitor table is now passed as a param (via type ControlAction)
  rather than shared (was a member of type Control)

- add integration tests

Change-Id: I58c0e167f70208ce07b5600e228227f20366adc7
  • Loading branch information
dougm committed Nov 30, 2012
1 parent 6d63c29 commit 045dc3a
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 69 deletions.
19 changes: 11 additions & 8 deletions api.go
Expand Up @@ -95,7 +95,7 @@ func NewAPI(config *ConfigManager) *API {

// *Process methods apply to a single service

func (c *Control) callAction(name string, r *ActionResult, action int) error {
func (c *Control) callAction(name string, r *ActionResult, action *ControlAction) error {
err := c.DoAction(name, action)

r.Total++
Expand All @@ -108,23 +108,23 @@ func (c *Control) callAction(name string, r *ActionResult, action int) error {
}

func (a *API) StartProcess(name string, r *ActionResult) error {
return a.Control.callAction(name, r, ACTION_START)
return a.Control.callAction(name, r, NewControlAction(ACTION_START))
}

func (a *API) StopProcess(name string, r *ActionResult) error {
return a.Control.callAction(name, r, ACTION_STOP)
return a.Control.callAction(name, r, NewControlAction(ACTION_STOP))
}

func (a *API) RestartProcess(name string, r *ActionResult) error {
return a.Control.callAction(name, r, ACTION_RESTART)
return a.Control.callAction(name, r, NewControlAction(ACTION_RESTART))
}

func (a *API) MonitorProcess(name string, r *ActionResult) error {
return a.Control.callAction(name, r, ACTION_MONITOR)
return a.Control.callAction(name, r, NewControlAction(ACTION_MONITOR))
}

func (a *API) UnmonitorProcess(name string, r *ActionResult) error {
return a.Control.callAction(name, r, ACTION_UNMONITOR)
return a.Control.callAction(name, r, NewControlAction(ACTION_UNMONITOR))
}

func (c *Control) processSummary(process *Process, summary *ProcessSummary) {
Expand Down Expand Up @@ -165,13 +165,15 @@ func (a *API) StatusProcess(name string, r *ProcessStatus) error {

// *Group methods apply to a service group

func (c *Control) groupAction(name string, r *ActionResult, action int) error {
func (c *Control) groupAction(name string, r *ActionResult, method int) error {
group, err := c.Config().FindGroup(name)

if err != nil {
return &ActionError{err}
}

action := NewGroupControlAction(method)

for name := range group.Processes {
c.callAction(name, r, action)
}
Expand Down Expand Up @@ -227,7 +229,8 @@ func (a *API) StatusGroup(name string, r *ProcessGroupStatus) error {

// *All methods apply to all services

func (c *Control) allAction(r *ActionResult, action int) error {
func (c *Control) allAction(r *ActionResult, method int) error {
action := NewGroupControlAction(method)
for _, processGroup := range c.Config().ProcessGroups {
for name, _ := range processGroup.Processes {
c.callAction(name, r, action)
Expand Down
106 changes: 66 additions & 40 deletions control.go
Expand Up @@ -26,6 +26,11 @@ const (
ACTION_UNMONITOR
)

const (
scopeDefault = iota
scopeRestartGroup
)

const (
processStopped = iota
processStarted
Expand All @@ -45,11 +50,16 @@ type EventMonitorInterface interface {
type Control struct {
ConfigManager *ConfigManager
EventMonitor EventMonitorInterface
visits map[string]*visitor
States map[string]*ProcessState
persistLock sync.Mutex
}

type ControlAction struct {
scope int
method int
visits map[string]*visitor
}

// flags to avoid invoking actions more than once
// as we traverse the dependency graph
type visitor struct {
Expand Down Expand Up @@ -138,7 +148,7 @@ func (c *ConfigManager) VisitProcesses(visit func(p *Process) bool) {
}
}

func (c *Control) visitorOf(process *Process) *visitor {
func (c *ControlAction) visitorOf(process *Process) *visitor {
if _, exists := c.visits[process.Name]; !exists {
c.visits[process.Name] = &visitor{}
}
Expand Down Expand Up @@ -168,9 +178,24 @@ func (c *Control) RegisterEventMonitor(eventMonitor *EventMonitor) {
c.EventMonitor = eventMonitor
}

func NewControlAction(method int) *ControlAction {
return &ControlAction{
method: method,
visits: make(map[string]*visitor),
}
}

func NewGroupControlAction(method int) *ControlAction {
action := NewControlAction(method)
if method == ACTION_RESTART {
action.scope = scopeRestartGroup
}
return action
}

// Invoke given action for the given process and its
// dependents and/or dependencies
func (c *Control) DoAction(name string, action int) error {
func (c *Control) DoAction(name string, action *ControlAction) error {
process, err := c.Config().FindProcess(name)
if err != nil {
Log.Error(err.Error())
Expand All @@ -182,39 +207,38 @@ func (c *Control) DoAction(name string, action int) error {
})
}

func (c *Control) dispatchAction(process *Process, action int) error {
c.visits = make(map[string]*visitor)
func (c *Control) dispatchAction(process *Process, action *ControlAction) error {

switch action {
switch action.method {
case ACTION_START:
if process.IsRunning() {
Log.Debugf("Process %q already running", process.Name)
c.monitorSet(process)
return nil
}
c.doDepend(process, ACTION_STOP)
c.doStart(process)
c.doDepend(process, ACTION_START)
c.doDepend(process, ACTION_STOP, action)
c.doStart(process, action)
c.doDepend(process, ACTION_START, action)

case ACTION_STOP:
c.doDepend(process, ACTION_STOP)
c.doStop(process)
c.doDepend(process, ACTION_STOP, action)
c.doStop(process, action)

case ACTION_RESTART:
c.doDepend(process, ACTION_STOP)
if c.doStop(process) {
c.doStart(process)
c.doDepend(process, ACTION_START)
c.doDepend(process, ACTION_STOP, action)
if c.doStop(process, action) {
c.doStart(process, action)
c.doDepend(process, ACTION_START, action)
} else {
c.monitorSet(process)
}

case ACTION_MONITOR:
c.doMonitor(process)
c.doMonitor(process, action)

case ACTION_UNMONITOR:
c.doDepend(process, ACTION_UNMONITOR)
c.doUnmonitor(process)
c.doDepend(process, ACTION_UNMONITOR, action)
c.doUnmonitor(process, action)

default:
err := fmt.Errorf("process %q -- invalid action: %d",
Expand All @@ -239,19 +263,21 @@ func (c *Control) invoke(process *Process, action func() error) error {
}

// Start the given Process dependencies before starting Process
func (c *Control) doStart(process *Process) {
visitor := c.visitorOf(process)
func (c *Control) doStart(process *Process, action *ControlAction) {
visitor := action.visitorOf(process)
if visitor.started {
return
}
visitor.started = true

for _, d := range process.DependsOn {
parent, err := c.Config().FindProcess(d)
if err != nil {
panic(err)
if action.scope != scopeRestartGroup {
for _, d := range process.DependsOn {
parent, err := c.Config().FindProcess(d)
if err != nil {
panic(err)
}
c.doStart(parent, action)
}
c.doStart(parent)
}

if !process.IsRunning() {
Expand All @@ -265,8 +291,8 @@ func (c *Control) doStart(process *Process) {

// Stop the given Process.
// Waits for process to stop or until Process.Timeout is reached.
func (c *Control) doStop(process *Process) bool {
visitor := c.visitorOf(process)
func (c *Control) doStop(process *Process, action *ControlAction) bool {
visitor := action.visitorOf(process)
var rv = true
if visitor.stopped {
return rv
Expand All @@ -286,8 +312,8 @@ func (c *Control) doStop(process *Process) bool {
}

// Enable monitoring for Process dependencies and given Process.
func (c *Control) doMonitor(process *Process) {
if c.visitorOf(process).started {
func (c *Control) doMonitor(process *Process, action *ControlAction) {
if action.visitorOf(process).started {
return
}

Expand All @@ -296,15 +322,15 @@ func (c *Control) doMonitor(process *Process) {
if err != nil {
panic(err)
}
c.doMonitor(parent)
c.doMonitor(parent, action)
}

c.monitorSet(process)
}

// Disable monitoring for the given Process
func (c *Control) doUnmonitor(process *Process) {
visitor := c.visitorOf(process)
func (c *Control) doUnmonitor(process *Process, action *ControlAction) {
visitor := action.visitorOf(process)
if visitor.stopped {
return
}
Expand All @@ -314,24 +340,24 @@ func (c *Control) doUnmonitor(process *Process) {
}

// Apply actions to processes that depend on the given Process
func (c *Control) doDepend(process *Process, action int) {
func (c *Control) doDepend(process *Process, method int, action *ControlAction) {
c.ConfigManager.VisitProcesses(func(child *Process) bool {
for _, dep := range child.DependsOn {
if dep == process.Name {
switch action {
switch method {
case ACTION_START:
c.doStart(child)
c.doStart(child, action)
case ACTION_MONITOR:
c.doMonitor(child)
c.doMonitor(child, action)
}

c.doDepend(child, action)
c.doDepend(child, method, action)

switch action {
switch method {
case ACTION_STOP:
c.doStop(child)
c.doStop(child, action)
case ACTION_UNMONITOR:
c.doUnmonitor(child)
c.doUnmonitor(child, action)
}
break
}
Expand Down
22 changes: 11 additions & 11 deletions control_test.go
Expand Up @@ -53,7 +53,7 @@ func (s *ControlSuite) TestActions(c *C) {
c.Check(MONITOR_NOT, Equals, ctl.State(process).Monitor)
c.Check(0, Equals, ctl.State(process).Starts)

rv := ctl.DoAction(name, ACTION_START)
rv := ctl.DoAction(name, NewControlAction(ACTION_START))
c.Check(1, Equals, fem.numStartMonitoringCalled)
c.Check(rv, IsNil)

Expand All @@ -62,18 +62,18 @@ func (s *ControlSuite) TestActions(c *C) {

c.Check(true, Equals, process.IsRunning())

rv = ctl.DoAction(name, ACTION_RESTART)
rv = ctl.DoAction(name, NewControlAction(ACTION_RESTART))
c.Check(2, Equals, fem.numStartMonitoringCalled)
c.Check(rv, IsNil)

c.Check(2, Equals, ctl.State(process).Starts)

rv = ctl.DoAction(name, ACTION_STOP)
rv = ctl.DoAction(name, NewControlAction(ACTION_STOP))
c.Check(rv, IsNil)

c.Check(MONITOR_NOT, Equals, ctl.State(process).Monitor)

rv = ctl.DoAction(name, ACTION_MONITOR)
rv = ctl.DoAction(name, NewControlAction(ACTION_MONITOR))
c.Check(3, Equals, fem.numStartMonitoringCalled)
c.Check(rv, IsNil)

Expand Down Expand Up @@ -111,13 +111,13 @@ func (s *ControlSuite) TestDepends(c *C) {
c.Check(err, IsNil)

// start main process
rv := ctl.DoAction(name, ACTION_START)
rv := ctl.DoAction(name, NewControlAction(ACTION_START))
c.Check(rv, IsNil)

c.Check(true, Equals, process.IsRunning())

// stop main process
rv = ctl.DoAction(name, ACTION_STOP)
rv = ctl.DoAction(name, NewControlAction(ACTION_STOP))
c.Check(rv, IsNil)
c.Check(false, Equals, process.IsRunning())

Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *ControlSuite) TestDepends(c *C) {
// test start/stop of dependant

// start main sevice
rv = ctl.DoAction(name, ACTION_START)
rv = ctl.DoAction(name, NewControlAction(ACTION_START))
c.Check(rv, IsNil)
c.Check(true, Equals, process.IsRunning())
c.Check(2, Equals, ctl.State(process).Starts)
Expand All @@ -167,28 +167,28 @@ func (s *ControlSuite) TestDepends(c *C) {
}

// stop a dependency
rv = ctl.DoAction(process.DependsOn[0], ACTION_STOP)
rv = ctl.DoAction(process.DependsOn[0], NewControlAction(ACTION_STOP))
c.Check(rv, IsNil)

// dependent will also stop
c.Check(false, Equals, process.IsRunning())

// start a dependency
rv = ctl.DoAction(process.DependsOn[0], ACTION_START)
rv = ctl.DoAction(process.DependsOn[0], NewControlAction(ACTION_START))
c.Check(rv, IsNil)

// main process will come back up
c.Check(true, Equals, process.IsRunning())

ctl.DoAction(process.Name, ACTION_STOP)
ctl.DoAction(process.Name, NewControlAction(ACTION_STOP))

c.Check(3, Equals, ctl.State(process).Starts)

c.Check(MONITOR_NOT, Equals, ctl.State(process).Monitor)

// stop all dependencies
for _, dname := range process.DependsOn {
ctl.DoAction(dname, ACTION_STOP)
ctl.DoAction(dname, NewControlAction(ACTION_STOP))
}

// verify every process has been stopped
Expand Down

0 comments on commit 045dc3a

Please sign in to comment.