Permalink
Browse files

Start/stop/restart actions and main integration.

This change adds the ability to use start/stop/restart actions in
event rules. It also integrates the eventmonitor into the main code,
so it runs when gonit is running.

Change-Id: I617024ccbc83b02e0fb80c6eb34a7c4d0eac46c5
  • Loading branch information...
1 parent 8d7c5a5 commit cb0d39f504a83e641b53571138d721c873bb62b6 @lisbakke lisbakke committed Sep 4, 2012
Showing with 509 additions and 132 deletions.
  1. +25 −25 api.go
  2. +31 −3 configmanager.go
  3. +42 −20 control.go
  4. +14 −2 control_test.go
  5. +149 −59 eventmonitor.go
  6. +136 −3 eventmonitor_test.go
  7. +17 −7 gonit/main.go
  8. +43 −2 resourcemanager.go
  9. +30 −1 resourcemanager_test.go
  10. +0 −2 test/config/dashboard-gonit.yml
  11. +22 −8 test/process/main.go
View
50 api.go
@@ -11,7 +11,7 @@ import (
var notimpl = errors.New("Method not implemented")
type API struct {
- control *Control
+ Control *Control
}
type ProcessSummary struct {
@@ -64,7 +64,7 @@ func (e *ActionError) Error() string {
func NewAPI(config *ConfigManager) *API {
return &API{
- control: &Control{configManager: config},
+ Control: &Control{configManager: config},
}
}
@@ -83,23 +83,23 @@ func (c *Control) callAction(name string, r *ActionResult, action int) error {
}
func (a *API) StartProcess(name string, r *ActionResult) error {
- return a.control.callAction(name, r, ACTION_START)
+ return a.Control.callAction(name, r, ACTION_START)
}
func (a *API) StopProcess(name string, r *ActionResult) error {
- return a.control.callAction(name, r, ACTION_STOP)
+ return a.Control.callAction(name, r, ACTION_STOP)
}
func (a *API) RestartProcess(name string, r *ActionResult) error {
- return a.control.callAction(name, r, ACTION_RESTART)
+ return a.Control.callAction(name, r, ACTION_RESTART)
}
func (a *API) MonitorProcess(name string, r *ActionResult) error {
- return a.control.callAction(name, r, ACTION_MONITOR)
+ return a.Control.callAction(name, r, ACTION_MONITOR)
}
func (a *API) UnmonitorProcess(name string, r *ActionResult) error {
- return a.control.callAction(name, r, ACTION_UNMONITOR)
+ return a.Control.callAction(name, r, ACTION_UNMONITOR)
}
func (c *Control) processSummary(process *Process, summary *ProcessSummary) {
@@ -129,13 +129,13 @@ func (c *Control) processStatus(process *Process, status *ProcessStatus) error {
}
func (a *API) StatusProcess(name string, r *ProcessStatus) error {
- process, err := a.control.Config().FindProcess(name)
+ process, err := a.Control.Config().FindProcess(name)
if err != nil {
return err
}
- return a.control.processStatus(process, r)
+ return a.Control.processStatus(process, r)
}
// *Group methods apply to a service group
@@ -155,23 +155,23 @@ func (c *Control) groupAction(name string, r *ActionResult, action int) error {
}
func (a *API) StartGroup(name string, r *ActionResult) error {
- return a.control.groupAction(name, r, ACTION_START)
+ return a.Control.groupAction(name, r, ACTION_START)
}
func (a *API) StopGroup(name string, r *ActionResult) error {
- return a.control.groupAction(name, r, ACTION_STOP)
+ return a.Control.groupAction(name, r, ACTION_STOP)
}
func (a *API) RestartGroup(name string, r *ActionResult) error {
- return a.control.groupAction(name, r, ACTION_RESTART)
+ return a.Control.groupAction(name, r, ACTION_RESTART)
}
func (a *API) MonitorGroup(name string, r *ActionResult) error {
- return a.control.groupAction(name, r, ACTION_MONITOR)
+ return a.Control.groupAction(name, r, ACTION_MONITOR)
}
func (a *API) UnmonitorGroup(name string, r *ActionResult) error {
- return a.control.groupAction(name, r, ACTION_UNMONITOR)
+ return a.Control.groupAction(name, r, ACTION_UNMONITOR)
}
func (c *Control) groupStatus(group *ProcessGroup,
@@ -187,14 +187,14 @@ func (c *Control) groupStatus(group *ProcessGroup,
}
func (a *API) StatusGroup(name string, r *ProcessGroupStatus) error {
- group, err := a.control.Config().FindGroup(name)
+ group, err := a.Control.Config().FindGroup(name)
if err != nil {
return err
}
r.Name = name
- a.control.groupStatus(group, r)
+ a.Control.groupStatus(group, r)
return nil
}
@@ -211,40 +211,40 @@ func (c *Control) allAction(r *ActionResult, action int) error {
}
func (a *API) StartAll(unused interface{}, r *ActionResult) error {
- return a.control.allAction(r, ACTION_START)
+ return a.Control.allAction(r, ACTION_START)
}
func (a *API) StopAll(unused interface{}, r *ActionResult) error {
- return a.control.allAction(r, ACTION_STOP)
+ return a.Control.allAction(r, ACTION_STOP)
}
func (a *API) RestartAll(unused interface{}, r *ActionResult) error {
- return a.control.allAction(r, ACTION_RESTART)
+ return a.Control.allAction(r, ACTION_RESTART)
}
func (a *API) MonitorAll(unused interface{}, r *ActionResult) error {
- return a.control.allAction(r, ACTION_MONITOR)
+ return a.Control.allAction(r, ACTION_MONITOR)
}
func (a *API) UnmonitorAll(unused interface{}, r *ActionResult) error {
- return a.control.allAction(r, ACTION_UNMONITOR)
+ return a.Control.allAction(r, ACTION_UNMONITOR)
}
func (a *API) StatusAll(name string, r *ProcessGroupStatus) error {
r.Name = name
- for _, processGroup := range a.control.Config().ProcessGroups {
- a.control.groupStatus(processGroup, r)
+ for _, processGroup := range a.Control.Config().ProcessGroups {
+ a.Control.groupStatus(processGroup, r)
}
return nil
}
func (a *API) Summary(unused interface{}, s *Summary) error {
- for _, group := range a.control.Config().ProcessGroups {
+ for _, group := range a.Control.Config().ProcessGroups {
for _, process := range group.Processes {
summary := ProcessSummary{}
- a.control.processSummary(process, &summary)
+ a.Control.processSummary(process, &summary)
s.Processes = append(s.Processes, summary)
}
}
View
34 configmanager.go
@@ -63,15 +63,16 @@ type Process struct {
Description string
DependsOn []string
Actions map[string][]string
- // TODO How do we make it so Monitor is true by default and only false when
- // explicitly set in yaml?
- Monitor bool
+ MonitorMode string
}
const (
CONFIG_FILE_POSTFIX = "-gonit.yml"
SETTINGS_FILENAME = "gonit.yml"
UNIX_SOCKET_TRANSPORT = "unix_socket"
+ MONITOR_MODE_ACTIVE = "active"
+ MONITOR_MODE_PASSIVE = "passive"
+ MONITOR_MODE_MANUAL = "manual"
)
const (
@@ -217,12 +218,27 @@ func (c *ConfigManager) Parse(paths ...string) error {
log.Printf("No settings found, using defaults.")
}
c.applyDefaultSettings()
+ c.applyDefaultConfigOpts()
if err := c.validate(); err != nil {
return err
}
return nil
}
+func (c *ConfigManager) applyDefaultMonitorMode() {
+ for _, pg := range c.ProcessGroups {
+ for _, process := range pg.Processes {
+ if process.MonitorMode == "" {
+ process.MonitorMode = MONITOR_MODE_ACTIVE
+ }
+ }
+ }
+}
+
+func (c *ConfigManager) applyDefaultConfigOpts() {
+ c.applyDefaultMonitorMode()
+}
+
// Validates that certain fields exist in the config file.
func (pg ProcessGroup) validateRequiredFieldsExist() error {
for name, process := range pg.Processes {
@@ -283,3 +299,15 @@ func (c *ConfigManager) validate() error {
}
return nil
}
+
+func (p *Process) IsMonitoringModeActive() bool {
+ return p.MonitorMode == MONITOR_MODE_ACTIVE
+}
+
+func (p *Process) IsMonitoringModePassive() bool {
+ return p.MonitorMode == MONITOR_MODE_PASSIVE
+}
+
+func (p *Process) IsMonitoringModeManual() bool {
+ return p.MonitorMode == MONITOR_MODE_MANUAL
+}
View
62 control.go
@@ -5,6 +5,7 @@ package gonit
import (
"fmt"
"log"
+ "sync"
"time"
)
@@ -28,8 +29,14 @@ const (
processStarted
)
+// So we can mock it in tests.
+type EventMonitorInterface interface {
+ StartMonitoringProcess(process *Process)
+}
+
type Control struct {
configManager *ConfigManager
+ EventMonitor EventMonitorInterface
visits map[string]*visitor
states map[string]*processState
}
@@ -43,8 +50,9 @@ type visitor struct {
// XXX TODO should state be attached to Process type?
type processState struct {
- Monitor int
- Starts int
+ Monitor int
+ MonitorLock sync.Mutex
+ Starts int
}
// XXX TODO needed for tests, a form of this should probably be in ConfigManager
@@ -69,6 +77,7 @@ func (c *ConfigManager) AddProcess(groupName string, process *Process) error {
return nil
}
+// BUG(lisbakke): If there are two processes named the same thing in different process groups, this could return the wrong process. ConfigManager should enforce unique group/process names.
// XXX TODO should probably be in configmanager.go
// Helper methods to find a Process by name
func (c *ConfigManager) FindProcess(name string) (*Process, error) {
@@ -137,6 +146,12 @@ func (c *Control) State(process *Process) *processState {
return c.states[process.Name]
}
+// Registers the event monitor with Control so that it can turn event monitoring
+// on/off when processes are started/stopped.
+func (c *Control) RegisterEventMonitor(eventMonitor *EventMonitor) {
+ c.EventMonitor = eventMonitor
+}
+
// Invoke given action for the given process and its
// dependents and/or dependencies
func (c *Control) DoAction(name string, action int) error {
@@ -155,19 +170,19 @@ func (c *Control) DoAction(name string, action int) error {
c.monitorSet(process)
return nil
}
- c.doDepend(process, ACTION_STOP, false)
+ c.doDepend(process, ACTION_STOP)
c.doStart(process)
- c.doDepend(process, ACTION_START, false)
+ c.doDepend(process, ACTION_START)
case ACTION_STOP:
- c.doDepend(process, ACTION_STOP, true)
- c.doStop(process, true)
+ c.doDepend(process, ACTION_STOP)
+ c.doStop(process)
case ACTION_RESTART:
- c.doDepend(process, ACTION_STOP, false)
- if c.doStop(process, false) {
+ c.doDepend(process, ACTION_STOP)
+ if c.doStop(process) {
c.doStart(process)
- c.doDepend(process, ACTION_START, false)
+ c.doDepend(process, ACTION_START)
} else {
c.monitorSet(process)
}
@@ -176,7 +191,7 @@ func (c *Control) DoAction(name string, action int) error {
c.doMonitor(process)
case ACTION_UNMONITOR:
- c.doDepend(process, ACTION_UNMONITOR, false)
+ c.doDepend(process, ACTION_UNMONITOR)
c.doUnmonitor(process)
default:
@@ -215,9 +230,8 @@ func (c *Control) doStart(process *Process) {
}
// Stop the given Process.
-// Monitoring is disabled when unmonitor flag is true.
// Waits for process to stop or until Process.Timeout is reached.
-func (c *Control) doStop(process *Process, unmonitor bool) bool {
+func (c *Control) doStop(process *Process) bool {
visitor := c.visitorOf(process)
var rv = true
if visitor.stopped {
@@ -232,9 +246,7 @@ func (c *Control) doStop(process *Process, unmonitor bool) bool {
}
}
- if unmonitor {
- c.monitorUnset(process)
- }
+ c.monitorUnset(process)
return rv
}
@@ -268,7 +280,7 @@ func (c *Control) doUnmonitor(process *Process) {
}
// Apply actions to processes that depend on the given Process
-func (c *Control) doDepend(process *Process, action int, unmonitor bool) {
+func (c *Control) doDepend(process *Process, action int) {
c.configManager.VisitProcesses(func(child *Process) bool {
for _, dep := range child.DependsOn {
if dep == process.Name {
@@ -279,11 +291,11 @@ func (c *Control) doDepend(process *Process, action int, unmonitor bool) {
c.doMonitor(child)
}
- c.doDepend(child, action, unmonitor)
+ c.doDepend(child, action)
switch action {
case ACTION_STOP:
- c.doStop(child, unmonitor)
+ c.doStop(child)
case ACTION_UNMONITOR:
c.doUnmonitor(child)
}
@@ -296,22 +308,32 @@ func (c *Control) doDepend(process *Process, action int, unmonitor bool) {
func (c *Control) monitorSet(process *Process) {
state := c.State(process)
-
+ state.MonitorLock.Lock()
+ defer state.MonitorLock.Unlock()
if state.Monitor == MONITOR_NOT {
state.Monitor = MONITOR_INIT
+ c.EventMonitor.StartMonitoringProcess(process)
log.Printf("%q monitoring enabled", process.Name)
}
}
func (c *Control) monitorUnset(process *Process) {
state := c.State(process)
-
+ state.MonitorLock.Lock()
+ defer state.MonitorLock.Unlock()
if state.Monitor != MONITOR_NOT {
state.Monitor = MONITOR_NOT
log.Printf("%q monitoring disabled", process.Name)
}
}
+func (c *Control) IsMonitoring(process *Process) bool {
+ state := c.State(process)
+ state.MonitorLock.Lock()
+ defer state.MonitorLock.Unlock()
+ return state.Monitor == MONITOR_INIT || state.Monitor == MONITOR_YES
+}
+
// Poll process for expected state change
func (p *Process) pollState(timeout time.Duration, expect int) bool {
isRunning := false
View
16 control_test.go
@@ -12,8 +12,17 @@ import (
var groupName = "controlTest"
+type FakeEventMonitor struct {
+ numStartMonitoringCalled int
+}
+
+func (fem *FakeEventMonitor) StartMonitoringProcess(process *Process) {
+ fem.numStartMonitoringCalled++
+}
+
func TestActions(t *testing.T) {
- c := &Control{}
+ fem := &FakeEventMonitor{}
+ c := &Control{EventMonitor: fem}
name := "simple"
process := helper.NewTestProcess(name, nil, false)
@@ -26,6 +35,7 @@ func TestActions(t *testing.T) {
assert.Equal(t, 0, c.State(process).Starts)
rv := c.DoAction(name, ACTION_START)
+ assert.Equal(t, 1, fem.numStartMonitoringCalled)
assert.Equal(t, nil, rv)
assert.Equal(t, MONITOR_INIT, c.State(process).Monitor)
@@ -34,6 +44,7 @@ func TestActions(t *testing.T) {
assert.Equal(t, true, process.IsRunning())
rv = c.DoAction(name, ACTION_RESTART)
+ assert.Equal(t, 2, fem.numStartMonitoringCalled)
assert.Equal(t, nil, rv)
assert.Equal(t, 2, c.State(process).Starts)
@@ -44,13 +55,14 @@ func TestActions(t *testing.T) {
assert.Equal(t, MONITOR_NOT, c.State(process).Monitor)
rv = c.DoAction(name, ACTION_MONITOR)
+ assert.Equal(t, 3, fem.numStartMonitoringCalled)
assert.Equal(t, nil, rv)
assert.Equal(t, MONITOR_INIT, c.State(process).Monitor)
}
func TestDepends(t *testing.T) {
- c := &Control{}
+ c := &Control{EventMonitor: &FakeEventMonitor{}}
name := "depsimple"
process := helper.NewTestProcess(name, nil, false)
View
208 eventmonitor.go
@@ -8,15 +8,17 @@ import (
"log"
"math"
"net"
+ "strings"
"time"
)
// TODO:
-// - Support more actions than alert.
// - Maybe consider changing the way rule checking works so that it timestamps
// the last time each rule was checked instead of the way it is?
// - Debug messages?
// - Support more than just unix socket for alerts.
+// - Move the parsing/validation logic from here to configmanager, since that's
+// a better fit.
// After configmanager gets the rules to be monitored, eventmonitor parses the
// rules and stores their data as ParsedEvent.
@@ -30,6 +32,7 @@ type ParsedEvent struct {
processName string
description string
interval time.Duration
+ action string
}
// The JSON message that is sent in alerts.
@@ -48,13 +51,25 @@ const (
DEFAULT_INTERVAL = "2s"
)
+var validActions = []string{"stop", "start", "restart", "alert"}
+
const (
EQ_OPERATOR = 0x1
NEQ_OPERATOR = 0x2
GT_OPERATOR = 0x3
LT_OPERATOR = 0x4
)
+// Returns whether or not the actionName is a valid action.
+func isValidAction(actionName string) bool {
+ for _, action := range validActions {
+ if actionName == action {
+ return true
+ }
+ }
+ return false
+}
+
// Returns whether a character is an operator character in an event rule.
func isAnOperatorChar(operatorChar string) bool {
return operatorChar == "<" || operatorChar == ">" || operatorChar == "=" ||
@@ -89,25 +104,39 @@ func checkRule(parsedEvent *ParsedEvent, resourceVal uint64) bool {
// resource values from resourcemanager. If any events trigger, it will take
// appropriate action.
type EventMonitor struct {
- alertEvents []*ParsedEvent
+ events []*ParsedEvent
resourceManager ResourceManager
configManager *ConfigManager
+ control ControlInterface
startTime int64
quitChan chan bool
}
+type ControlInterface interface {
+ DoAction(name string, action int) error
+ IsMonitoring(process *Process) bool
+}
+
+// Simple helper to make testing easier.
+func (e *EventMonitor) registerControl(control ControlInterface) {
+ e.control = control
+}
+
// Initializes the eventmonitor by parsing event rules and initializing data
// structures. The configmanager is where the events come from.
-func (e *EventMonitor) setup(configManager *ConfigManager) error {
+func (e *EventMonitor) setup(configManager *ConfigManager,
+ control *Control) error {
e.resourceManager = resourceManager
e.configManager = configManager
- e.alertEvents = []*ParsedEvent{}
+ e.registerControl(control)
+ e.events = []*ParsedEvent{}
for _, group := range e.configManager.ProcessGroups {
for _, process := range group.Processes {
for actionName, actions := range process.Actions {
for _, eventName := range actions {
event := group.EventByName(eventName)
- if err := e.loadEvent(event, group.Name, process); err != nil {
+ if err := e.loadEvent(event, group.Name, process,
+ actionName); err != nil {
return fmt.Errorf("Did not load rule '%v' on action '%v' because "+
"of error: '%v'.", eventName, actionName, err.Error())
}
@@ -120,13 +149,55 @@ func (e *EventMonitor) setup(configManager *ConfigManager) error {
return nil
}
+func (e *EventMonitor) printTriggeredMessage(event *ParsedEvent,
+ resourceVal uint64) {
+ log.Printf("'%v' triggered '%v' for '%v' (at '%v'). Executing '%v'.\n",
+ event.processName, event.ruleString, event.duration, resourceVal,
+ event.action)
+}
+
+func (e *EventMonitor) triggerAction(process *Process, event *ParsedEvent,
+ resourceVal uint64) error {
+ switch event.action {
+ case "stop":
+ if e.TriggerProcessActions(process) {
+ e.printTriggeredMessage(event, resourceVal)
+ return e.control.DoAction(event.processName, ACTION_STOP)
+ } else {
+ return nil
+ }
+ case "start":
+ if e.TriggerProcessActions(process) {
+ e.printTriggeredMessage(event, resourceVal)
+ return e.control.DoAction(event.processName, ACTION_START)
+ } else {
+ return nil
+ }
+ case "restart":
+ if e.TriggerProcessActions(process) {
+ e.printTriggeredMessage(event, resourceVal)
+ return e.control.DoAction(event.processName, ACTION_RESTART)
+ } else {
+ return nil
+ }
+ case "alert":
+ if e.TriggerAlerts(process) {
+ e.printTriggeredMessage(event, resourceVal)
+ return e.sendAlert(event)
+ } else {
+ return nil
+ }
+ }
+ return fmt.Errorf("No event action '%v' exists.", event.action)
+}
+
// Given a configmanager config, this function starts the eventmonitor on
// monitoring events and dispatching them.
-func (e *EventMonitor) Start(configManager *ConfigManager) error {
- if err := e.setup(configManager); err != nil {
+func (e *EventMonitor) Start(configManager *ConfigManager,
+ control *Control) error {
+ if err := e.setup(configManager, control); err != nil {
return err
}
-
go func() {
timeToWait := 1 * time.Second
ticker := time.NewTicker(timeToWait)
@@ -140,16 +211,16 @@ func (e *EventMonitor) Start(configManager *ConfigManager) error {
case <-ticker.C:
for _, group := range e.configManager.ProcessGroups {
for _, process := range group.Processes {
- // TODO change the GetPid to be a go routine that happens every X
- // seconds with a lock on it so we don't have to keep opening the
- // file.
- pid, err := process.Pid()
- if err != nil {
- log.Println("Could not get pid file for process '%v'. Error: %+v",
- process.Name, err)
- }
- if process.Monitor != false {
- e.checkRules(process.Name, pid)
+ if e.IsMonitoring(process) {
+ // TODO change the GetPid to be a go routine that happens every X
+ // seconds with a lock on it so we don't have to keep opening the
+ // file.
+ pid, err := process.Pid()
+ if err != nil {
+ log.Printf("Could not get pid file for process '%v'. Error: "+
+ "%+v\n", process.Name, err)
+ }
+ e.checkRules(process, pid)
}
}
}
@@ -167,50 +238,44 @@ func (e *EventMonitor) Stop() {
// Given a process name and a pid, this will check all the rules associated with
// it for this time period.
-func (e *EventMonitor) checkRules(processName string, pid int) {
+func (e *EventMonitor) checkRules(process *Process, pid int) {
+ processName := process.Name
diffTime := time.Now().Unix() - e.startTime
- // So we don't check the same thing more than once in a row.
- cachedResources := map[string]uint64{}
- for _, alertEvent := range e.alertEvents {
- interval := int64(alertEvent.interval.Seconds())
- if alertEvent.processName == processName &&
+ for _, event := range e.events {
+ interval := int64(event.interval.Seconds())
+ if event.processName == processName &&
(interval == 0 || diffTime%interval == 0) {
var resourceVal uint64
- // Use cache unless we must pull the resource.
- resourceVal, has_key := cachedResources[alertEvent.resourceName]
- if !has_key {
- var err error
- resourceVal, err = e.resourceManager.GetResource(alertEvent, pid)
- if err != nil {
- log.Print(err)
- continue
- }
- cachedResources[alertEvent.resourceName] = resourceVal
+ var err error
+ resourceVal, err = e.resourceManager.GetResource(event, pid)
+ if err != nil {
+ log.Print(err)
+ continue
}
- ruleTriggered := checkRule(alertEvent, resourceVal)
+ ruleTriggered := checkRule(event, resourceVal)
if ruleTriggered {
// TODO right now this can block the monitoring loop.
- if err := e.sendAlert(alertEvent); err != nil {
+ if err := e.triggerAction(process, event, resourceVal); err != nil {
log.Print(err)
}
}
}
}
+ e.resourceManager.ClearCachedResources()
}
// Given Events from ConfigManager, parses them and adds them to internal data
// so they can be monitored.
func (e *EventMonitor) loadEvent(event *Event, groupName string,
- process *Process) error {
- parsedEvent, err := e.parseEvent(event, groupName, process.Name)
+ process *Process, actionName string) error {
+ parsedEvent, err := e.parseEvent(event, groupName, process.Name, actionName)
if err != nil {
return err
}
-
if err = e.validateInterval(parsedEvent); err != nil {
return err
}
- e.alertEvents = append(e.alertEvents, parsedEvent)
+ e.events = append(e.events, parsedEvent)
return nil
}
@@ -282,7 +347,7 @@ func (e *EventMonitor) parseRule(rule string) (uint64, int, string, error) {
// Given an Event, parses the rule into amount, operator and resourceName, does
// a few other things, then returns a ParsedEvent ready to be monitored.
func (e *EventMonitor) parseEvent(event *Event, groupName string,
- processName string) (*ParsedEvent, error) {
+ processName string, actionName string) (*ParsedEvent, error) {
rule := event.Rule
ruleAmount, operator, resourceName, err := e.parseRule(rule)
if err != nil {
@@ -307,7 +372,13 @@ func (e *EventMonitor) parseEvent(event *Event, groupName string,
return nil, err
}
+ if !isValidAction(actionName) {
+ return nil, fmt.Errorf("No event action '%v' exists. Valid actions "+
+ "are [%+v].", actionName, strings.Join(validActions, ", "))
+ }
+
parsedEvent := &ParsedEvent{
+ action: actionName,
operator: operator,
ruleAmount: ruleAmount,
resourceName: resourceName,
@@ -321,43 +392,42 @@ func (e *EventMonitor) parseEvent(event *Event, groupName string,
return parsedEvent, nil
}
-// Sends an alerts.
+// Sends an alert.
func (e *EventMonitor) sendAlert(parsedEvent *ParsedEvent) error {
settings := e.configManager.Settings
if settings.AlertTransport == UNIX_SOCKET_TRANSPORT {
- if err := e.sendUnixSocketAlert(parsedEvent, settings.SocketFile); err != nil {
+ if err := e.sendUnixSocketAlert(parsedEvent,
+ settings.SocketFile); err != nil {
return err
}
- } else {
- log.Printf("Rule '%v' for process '%v' has triggered for > %v seconds.\n",
- parsedEvent.ruleString, parsedEvent.processName, parsedEvent.duration)
}
return nil
}
func (e *EventMonitor) validateInterval(parsedEvent *ParsedEvent) error {
- for _, event := range e.alertEvents {
+ for _, event := range e.events {
if event.processName == parsedEvent.processName &&
event.resourceName == parsedEvent.resourceName {
if event.interval != parsedEvent.interval {
return fmt.Errorf("Two rules ('%v' and '%v') on '%v' have different "+
"poll intervals for the same resource '%v'.", event.ruleString,
parsedEvent.ruleString, event.processName, event.resourceName)
}
- durationRatio := event.duration.Seconds() / event.interval.Seconds()
- if event.resourceName == CPU_PERCENT_NAME &&
- (event.duration.Seconds()/event.interval.Seconds()) <= 1 {
- return fmt.Errorf("Rule '%v' duration / interval must be greater "+
- "than 1. It is '%+v / %+v'.", event.ruleString,
- event.duration.Seconds(), event.interval.Seconds())
- }
-
- if math.Mod(durationRatio, 1.0) != 0.0 {
- return fmt.Errorf("Rule '%v' duration / interval must be an integer.",
- event.ruleString)
- }
}
}
+ durationRatio := parsedEvent.duration.Seconds() /
+ parsedEvent.interval.Seconds()
+ if parsedEvent.resourceName == CPU_PERCENT_NAME &&
+ (parsedEvent.duration.Seconds()/parsedEvent.interval.Seconds()) <= 1 {
+ return fmt.Errorf("Rule '%v' duration / interval must be greater "+
+ "than 1. It is '%+v / %+v'.", parsedEvent.ruleString,
+ parsedEvent.duration.Seconds(), parsedEvent.interval.Seconds())
+ }
+
+ if math.Mod(durationRatio, 1.0) != 0.0 {
+ return fmt.Errorf("Rule '%v' duration / interval must be an integer.",
+ parsedEvent.ruleString)
+ }
return nil
}
@@ -388,3 +458,23 @@ func (e *EventMonitor) sendUnixSocketAlert(parsedEvent *ParsedEvent,
}
return nil
}
+
+func (e *EventMonitor) CleanDataForProcess(p *Process) {
+ e.resourceManager.CleanDataForProcess(p)
+}
+
+func (e *EventMonitor) IsMonitoring(p *Process) bool {
+ return e.control.IsMonitoring(p) && !p.IsMonitoringModeManual()
+}
+
+func (e *EventMonitor) StartMonitoringProcess(p *Process) {
+ e.CleanDataForProcess(p)
+}
+
+func (e *EventMonitor) TriggerAlerts(p *Process) bool {
+ return p.IsMonitoringModeActive() || p.IsMonitoringModePassive()
+}
+
+func (e *EventMonitor) TriggerProcessActions(p *Process) bool {
+ return p.IsMonitoringModeActive()
+}
View
139 eventmonitor_test.go
@@ -15,6 +15,12 @@ func init() {
eventMonitor = EventMonitor{}
}
+func RegisterNewFakeControl() *FakeControl {
+ fc := &FakeControl{}
+ eventMonitor.registerControl(fc)
+ return fc
+}
+
func TestIsAnOperatorChar(t *testing.T) {
assert.Equal(t, true, isAnOperatorChar("="))
assert.Equal(t, true, isAnOperatorChar(">"))
@@ -132,7 +138,7 @@ func TestParseEvent(t *testing.T) {
Description: "The best rule ever!",
}
parsedEvent, err :=
- eventMonitor.parseEvent(&event, "GroupName", "ProcessName")
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "alert")
if err != nil {
t.Fatal(err)
}
@@ -154,16 +160,143 @@ func TestParseBadIntervalEvents(t *testing.T) {
Interval: "10s",
Description: "The best rule ever!",
}
- _, err := eventMonitor.parseEvent(&event1, "GroupName", "ProcessName")
+ _, err := eventMonitor.parseEvent(&event1, "GroupName", "ProcessName",
+ "alert")
if err != nil {
assert.Equal(t,
"Rule 'memory_used>2gb' duration / interval must be an integer.",
err.Error())
}
- _, err = eventMonitor.parseEvent(&event2, "GroupName", "ProcessName")
+ _, err = eventMonitor.parseEvent(&event2, "GroupName", "ProcessName", "alert")
if err != nil {
assert.Equal(t,
"Rule 'cpu_percent>60' duration / interval must be greater than 1. It "+
"is '10 / 10'.", err.Error())
}
}
+
+type FakeControl struct {
+ numDoActionCalled int
+ lastActionCalled int
+ isMonitoring bool
+}
+
+func (fc *FakeControl) DoAction(name string, action int) error {
+ fc.numDoActionCalled++
+ fc.lastActionCalled = action
+ return nil
+}
+
+func (fc *FakeControl) IsMonitoring(process *Process) bool {
+ return fc.isMonitoring
+}
+
+func TestActionTriggers(t *testing.T) {
+ fc := RegisterNewFakeControl()
+ fc.isMonitoring = true
+ eventMonitor.configManager = &ConfigManager{}
+ eventMonitor.configManager.Settings = &Settings{}
+ process := &Process{
+ Name: "ProcessName",
+ MonitorMode: "active",
+ }
+ event := Event{
+ Rule: "memory_used>2mb",
+ Duration: "10s",
+ Interval: "10s",
+ Description: "The best rule ever!",
+ }
+ parsedEvent, _ :=
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "stop")
+ err := eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 1, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_STOP, fc.lastActionCalled)
+
+ parsedEvent, _ =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "start")
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 2, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_START, fc.lastActionCalled)
+
+ parsedEvent, _ =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "restart")
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 3, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_RESTART, fc.lastActionCalled)
+
+ parsedEvent, _ =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "alert")
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 3, fc.numDoActionCalled)
+
+ _, err =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "doesntexist")
+ assert.Equal(t, "No event action 'doesntexist' exists. Valid actions are "+
+ "[stop, start, restart, alert].", err.Error())
+ parsedEvent.action = "doesntexist"
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ assert.Equal(t, "No event action 'doesntexist' exists.", err.Error())
+
+ eventMonitor = EventMonitor{}
+}
+
+func TestMonitoringModes(t *testing.T) {
+ fc := RegisterNewFakeControl()
+ fc.isMonitoring = true
+ eventMonitor.configManager = &ConfigManager{}
+ eventMonitor.configManager.Settings = &Settings{}
+ process := &Process{
+ Name: "ProcessName",
+ MonitorMode: "active",
+ }
+ event := Event{
+ Rule: "memory_used>2mb",
+ Duration: "10s",
+ Interval: "10s",
+ Description: "The best rule ever!",
+ }
+ parsedEvent, _ :=
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "stop")
+ err := eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 1, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_STOP, fc.lastActionCalled)
+
+ // We shoudn't trigger the action in passive mode.
+ process.MonitorMode = "passive"
+ parsedEvent, _ =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "start")
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 1, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_STOP, fc.lastActionCalled)
+
+ // We shouldn't trigger the action in manual mode.
+ process.MonitorMode = "manual"
+ parsedEvent, _ =
+ eventMonitor.parseEvent(&event, "GroupName", "ProcessName", "start")
+ err = eventMonitor.triggerAction(process, parsedEvent, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 1, fc.numDoActionCalled)
+ assert.Equal(t, ACTION_STOP, fc.lastActionCalled)
+
+ eventMonitor = EventMonitor{}
+}
View
24 gonit/main.go
@@ -34,8 +34,9 @@ var (
defaultRpcUrl = filepath.Join(home, "."+name+".sock")
// internal
- api *gonit.API
- rpcServer *gonit.RpcServer
+ api *gonit.API
+ rpcServer *gonit.RpcServer
+ eventMonitor *gonit.EventMonitor
)
func main() {
@@ -56,11 +57,10 @@ func main() {
}
api = gonit.NewAPI(configManager)
-
args := flag.Args()
if len(args) == 0 {
if polltime != 0 {
- runDaemon()
+ runDaemon(api.Control, configManager)
} else {
log.Fatal("Nothing todo (yet)")
}
@@ -186,7 +186,7 @@ func wakeup() {
func shutdown() {
log.Printf("Quit")
-
+ eventMonitor.Stop()
if rpcServer != nil {
rpcServer.Shutdown()
}
@@ -237,7 +237,7 @@ func isRunning() bool {
return err == nil && syscall.Kill(pid, 0) == nil
}
-func runDaemon() {
+func runDaemon(control *gonit.Control, configManager *gonit.ConfigManager) {
if isRunning() {
log.Fatalf("%s daemon is already running", name)
}
@@ -252,11 +252,21 @@ func runDaemon() {
log.Fatal(err)
}
defer os.Remove(pidfile)
-
+ createEventMonitor(control, configManager)
start()
loop()
}
+func createEventMonitor(control *gonit.Control,
+ configManager *gonit.ConfigManager) {
+ eventMonitor = &gonit.EventMonitor{}
+ err := eventMonitor.Start(configManager, control)
+ if err != nil {
+ log.Fatal(err)
+ }
+ control.RegisterEventMonitor(eventMonitor)
+}
+
func showVersion() {
fmt.Printf("Gonit version %s\n", gonit.VERSION)
}
View
45 resourcemanager.go
@@ -42,6 +42,9 @@ func (s *SigarGetter) getProcTime(pid int) (uint64, error) {
type ResourceManager struct {
resourceHolders []*ResourceHolder
sigarInterface SigarInterface
+ // Used by eventmonitor to cache resources so they don't get pulled multiple
+ // times when multiple rules are being checked for the same resource.
+ cachedResources map[string]uint64
}
type ResourceHolder struct {
@@ -53,7 +56,8 @@ type ResourceHolder struct {
}
var resourceManager ResourceManager = ResourceManager{
- sigarInterface: &SigarGetter{},
+ sigarInterface: &SigarGetter{},
+ cachedResources: map[string]uint64{},
}
type DataTimestamp struct {
@@ -79,6 +83,18 @@ var validResourceNames = map[string]bool{
// Cleans data from ResourceManager.
func (r *ResourceManager) CleanData() {
r.resourceHolders = []*ResourceHolder{}
+ r.ClearCachedResources()
+}
+
+// Cleans up the resource data used for a process's event monitors.
+func (r *ResourceManager) CleanDataForProcess(p *Process) {
+ for _, resourceHolder := range r.resourceHolders {
+ if resourceHolder.processName == p.Name {
+ resourceHolder.dataTimestamps = []*DataTimestamp{}
+ resourceHolder.firstEntryIndex = 0
+ }
+ }
+ r.ClearCachedResources()
}
// Get the nth entry in the data. Accepts a negaitve number, as well, so that
@@ -140,9 +156,27 @@ func (r *ResourceManager) getResourceHolder(
}
resourceHolder.maxDataToStore = int64(math.Ceil(duration / interval))
+ r.resourceHolders = append(r.resourceHolders, resourceHolder)
return resourceHolder
}
+// Sets an entry in the resources cache.
+func (r *ResourceManager) setCachedResource(resourceName string, value uint64) {
+ r.cachedResources[resourceName] = value
+}
+
+// Gets an entry in the resources cache.
+func (r *ResourceManager) getCachedResource(
+ resourceName string) (uint64, bool) {
+ value, has_key := r.cachedResources[resourceName]
+ return value, has_key
+}
+
+// Clears the resources cache.
+func (r *ResourceManager) ClearCachedResources() {
+ r.cachedResources = map[string]uint64{}
+}
+
// Given a ParsedEvent, will populate the correct resourceHolder with the
// resource value.
func (r *ResourceManager) gatherResource(parsedEvent *ParsedEvent,
@@ -151,7 +185,6 @@ func (r *ResourceManager) gatherResource(parsedEvent *ParsedEvent,
if err := r.gather(pid, resourceHolder); err != nil {
return err
}
- r.resourceHolders = append(r.resourceHolders, resourceHolder)
return nil
}
@@ -232,9 +265,16 @@ func (r *ResourceManager) GetResource(parsedEvent *ParsedEvent,
pid int) (uint64, error) {
resourceName := parsedEvent.resourceName
processName := parsedEvent.processName
+
+ data, has_key := r.getCachedResource(resourceName)
+ if has_key {
+ return data, nil
+ }
+
if err := r.gatherResource(parsedEvent, pid); err != nil {
return 0, err
}
+
for _, resourceHolder := range r.resourceHolders {
lenResourceData := len(resourceHolder.dataTimestamps)
if resourceHolder.processName == processName &&
@@ -243,6 +283,7 @@ func (r *ResourceManager) GetResource(parsedEvent *ParsedEvent,
if err != nil {
return 0, err
}
+ r.setCachedResource(resourceName, data)
return data, nil
}
}
View
31 resourcemanager_test.go
@@ -27,7 +27,10 @@ func (s *FakeSigarGetter) getProcTime(pid int) (uint64, error) {
var r ResourceManager
func Setup() {
- r = ResourceManager{sigarInterface: &SigarGetter{}}
+ r = ResourceManager{
+ sigarInterface: &SigarGetter{},
+ cachedResources: map[string]uint64{},
+ }
}
func TestCalculateProcPercent(t *testing.T) {
@@ -93,6 +96,7 @@ func TestGatherProcPercent(t *testing.T) {
timeThen := r.resourceHolders[0].dataTimestamps[0].nanoTimestamp
// Set the time to be 1 second ago.
r.resourceHolders[0].dataTimestamps[0].nanoTimestamp = timeThen - 1000000000
+ r.ClearCachedResources()
resourceVal, err = r.GetResource(pe, 1234)
if err != nil {
t.Fatal(err)
@@ -155,6 +159,7 @@ func TestCircularProcData(t *testing.T) {
procUsed: uint64(2886 + i),
}
r.SetSigarInterface(fsg)
+ r.ClearCachedResources()
_, err := r.GetResource(pe, 1234)
if err != nil {
t.Fatal(err)
@@ -172,6 +177,7 @@ func TestCircularProcData(t *testing.T) {
procUsed: uint64(2897),
}
r.SetSigarInterface(fsg)
+ r.ClearCachedResources()
_, err := r.GetResource(pe, 1234)
if err != nil {
t.Fatal(err)
@@ -195,6 +201,7 @@ func TestDuration(t *testing.T) {
procUsed: uint64(2886 + i),
}
r.SetSigarInterface(fsg)
+ r.ClearCachedResources()
_, err := r.GetResource(pe, 1234)
if err != nil {
t.Fatal(err)
@@ -210,6 +217,7 @@ func TestDuration(t *testing.T) {
procUsed: uint64(4000),
}
r.SetSigarInterface(fsg)
+ r.ClearCachedResources()
resourceVal, err := r.GetResource(pe, 1234)
if err != nil {
t.Fatal(err)
@@ -220,3 +228,24 @@ func TestDuration(t *testing.T) {
expectedPercent := uint64(100 * (float64(valDiff) / float64(timeDiff)))
assert.Equal(t, expectedPercent, resourceVal)
}
+
+func TestCleanProcData(t *testing.T) {
+ Setup()
+ dts := &DataTimestamp{
+ data: 1,
+ nanoTimestamp: 1,
+ }
+ rh := &ResourceHolder{
+ dataTimestamps: []*DataTimestamp{dts},
+ firstEntryIndex: 1,
+ processName: "process",
+ resourceName: "resource",
+ }
+ proc := &Process{Name: "process"}
+ r.resourceHolders = append(r.resourceHolders, rh)
+ assert.Equal(t, 1, len(r.resourceHolders[0].dataTimestamps))
+ assert.Equal(t, int64(1), r.resourceHolders[0].firstEntryIndex)
+ r.CleanDataForProcess(proc)
+ assert.Equal(t, 0, len(r.resourceHolders[0].dataTimestamps))
+ assert.Equal(t, int64(0), r.resourceHolders[0].firstEntryIndex)
+}
View
2 test/config/dashboard-gonit.yml
@@ -13,7 +13,6 @@ processes:
start: /var/vcap/jobs/opentsdb/bin/opentsdb_ctl start
stop: /var/vcap/jobs/opentsdb/bin/opentsdb_ctl stop
user: vcap
- monitor: true
dashboard:
description: The cloud foundry dashboard.
actions:
@@ -26,7 +25,6 @@ processes:
stop: /var/vcap/jobs/dashboard/bin/dashboard_ctl stop
gid: vcap
user: vcap
- monitor: true
events:
memory_over_5:
description: The memory for a process is too high.
View
30 test/process/main.go
@@ -19,13 +19,14 @@ import (
)
var (
- fork = flag.Bool("F", false, "Fork me")
- grand = flag.Bool("G", false, "Behave as grandchild process")
- dir = flag.String("d", os.TempDir(), "test directory")
- name = flag.String("n", "test", "process name")
- pidfile = flag.String("p", "test.pid", "process pid file")
- sleep = flag.String("s", "10s", "sleep duration")
- exit = flag.Int("x", 0, "exit code")
+ fork = flag.Bool("F", false, "Fork me")
+ grand = flag.Bool("G", false, "Behave as grandchild process")
+ memballoon = flag.Bool("MB", false, "balloon memory used")
+ dir = flag.String("d", os.TempDir(), "test directory")
+ name = flag.String("n", "test", "process name")
+ pidfile = flag.String("p", "test.pid", "process pid file")
+ sleep = flag.String("s", "10s", "sleep duration")
+ exit = flag.Int("x", 0, "exit code")
)
var restarts = 0
@@ -90,6 +91,15 @@ func forkme() {
os.Exit(0)
}
+func balloon() {
+ dess := make([][]float64, 300)
+ for i := 0; i < 300; i++ {
+ fmt.Fprintf(os.Stdout, "Ballooning %v\n", i)
+ dess[i] = make([]float64, 900000)
+ time.Sleep(time.Duration(100) * time.Millisecond)
+ }
+}
+
func main() {
flag.Parse()
log.SetFlags(log.Ltime | log.Lshortfile)
@@ -112,7 +122,11 @@ func main() {
saveProcessInfo()
fmt.Fprintf(os.Stdout, "Started. [sleep(%s)]\n", *sleep)
- time.Sleep(pause)
+ if *memballoon {
+ balloon()
+ } else {
+ time.Sleep(pause)
+ }
fmt.Fprintf(os.Stdout, "Stopped. [exit(%d)]\n", *exit)
os.Exit(*exit)

0 comments on commit cb0d39f

Please sign in to comment.