forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
141 lines (121 loc) · 2.69 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package talk
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"sync/atomic"
"github.com/influxdata/kapacitor/alert"
)
type Service struct {
configValue atomic.Value
logger *log.Logger
}
func NewService(c Config, l *log.Logger) *Service {
s := &Service{
logger: l,
}
s.configValue.Store(c)
return s
}
func (s *Service) Open() error {
return nil
}
func (s *Service) Close() error {
return nil
}
func (s *Service) config() Config {
return s.configValue.Load().(Config)
}
func (s *Service) Update(newConfig []interface{}) error {
if l := len(newConfig); l != 1 {
return fmt.Errorf("expected only one new config object, got %d", l)
}
if c, ok := newConfig[0].(Config); !ok {
return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0])
} else {
s.configValue.Store(c)
}
return nil
}
type testOptions struct {
Title string `json:"title"`
Text string `json:"text"`
}
func (s *Service) TestOptions() interface{} {
return &testOptions{
Title: "testTitle",
Text: "test talk text",
}
}
func (s *Service) Test(options interface{}) error {
o, ok := options.(*testOptions)
if !ok {
return fmt.Errorf("unexpected options type %T", options)
}
return s.Alert(o.Title, o.Text)
}
func (s *Service) Alert(title, text string) error {
url, post, err := s.preparePost(title, text)
if err != nil {
return err
}
resp, err := http.Post(url, "application/json", post)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
type response struct {
Error string `json:"error"`
}
r := &response{Error: fmt.Sprintf("failed to understand Talk response. code: %d content: %s", resp.StatusCode, string(body))}
dec := json.NewDecoder(resp.Body)
dec.Decode(r)
return errors.New(r.Error)
}
return nil
}
func (s *Service) preparePost(title, text string) (string, io.Reader, error) {
c := s.config()
if !c.Enabled {
return "", nil, errors.New("service is not enabled")
}
postData := make(map[string]interface{})
postData["title"] = title
postData["text"] = text
postData["authorName"] = c.AuthorName
var post bytes.Buffer
enc := json.NewEncoder(&post)
err := enc.Encode(postData)
if err != nil {
return "", nil, err
}
return c.URL, &post, nil
}
type handler struct {
s *Service
logger *log.Logger
}
func (s *Service) Handler(l *log.Logger) alert.Handler {
return &handler{
s: s,
logger: l,
}
}
func (h *handler) Handle(event alert.Event) {
if err := h.s.Alert(
event.State.ID,
event.State.Message,
); err != nil {
h.logger.Println("E! failed to send event to Talk", err)
}
}