forked from open-falcon/falcon-plus
/
scheduler.go
119 lines (100 loc) · 2.42 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package plugins
import (
"bytes"
"encoding/json"
"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"github.com/toolkits/file"
"github.com/toolkits/sys"
"log"
"os/exec"
"path/filepath"
"syscall"
"time"
)
type PluginScheduler struct {
Ticker *time.Ticker
Plugin *Plugin
Quit chan struct{}
}
func NewPluginScheduler(p *Plugin) *PluginScheduler {
scheduler := PluginScheduler{Plugin: p}
scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)
scheduler.Quit = make(chan struct{})
return &scheduler
}
func (this *PluginScheduler) Schedule() {
go func() {
for {
select {
case <-this.Ticker.C:
PluginRun(this.Plugin)
case <-this.Quit:
this.Ticker.Stop()
return
}
}
}()
}
func (this *PluginScheduler) Stop() {
close(this.Quit)
}
func PluginRun(plugin *Plugin) {
timeout := plugin.Cycle*1000 - 500
fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)
if !file.IsExist(fpath) {
log.Println("no such plugin:", fpath)
return
}
debug := g.Config().Debug
if debug {
log.Println(fpath, "running...")
}
cmd := exec.Command(fpath)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Start()
if debug {
log.Println("plugin started:", fpath)
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)
errStr := stderr.String()
if errStr != "" {
logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
if _, err = file.WriteString(logFile, errStr); err != nil {
log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
}
}
if isTimeout {
// has be killed
if err == nil && debug {
log.Println("[INFO] timeout and kill process", fpath, "successfully")
}
if err != nil {
log.Println("[ERROR] kill process", fpath, "occur error:", err)
}
return
}
if err != nil {
log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
return
}
// exec successfully
data := stdout.Bytes()
if len(data) == 0 {
if debug {
log.Println("[DEBUG] stdout of", fpath, "is blank")
}
return
}
var metrics []*model.MetricValue
err = json.Unmarshal(data, &metrics)
if err != nil {
log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
return
}
g.SendToTransfer(metrics)
}