Skip to content

Commit

Permalink
- compile message templates
Browse files Browse the repository at this point in the history
- send metrics to cachet
- fix http default configuration
  • Loading branch information
matejkramny committed Feb 6, 2017
1 parent b4fa33b commit b3bc1d4
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 87 deletions.
47 changes: 34 additions & 13 deletions api.go
Expand Up @@ -3,9 +3,13 @@ package cachet
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strconv"
"time"

"github.com/Sirupsen/logrus"
)

type CachetAPI struct {
Expand All @@ -14,6 +18,10 @@ type CachetAPI struct {
Insecure bool `json:"insecure"`
}

type CachetResponse struct {
Data json.RawMessage `json:"data"`
}

func (api CachetAPI) Ping() error {
resp, _, err := api.NewRequest("GET", "/ping", nil)
if err != nil {
Expand All @@ -27,30 +35,43 @@ func (api CachetAPI) Ping() error {
return nil
}

func (api CachetAPI) NewRequest(requestType, url string, reqBody []byte) (*http.Response, []byte, error) {
// SendMetric adds a data point to a cachet monitor
func (api CachetAPI) SendMetric(id int, lag int64) {
logrus.Debugf("Sending lag metric ID:%d %vms", id, lag)

jsonBytes, _ := json.Marshal(map[string]interface{}{
"value": lag,
"timestamp": time.Now().Unix(),
})

resp, _, err := api.NewRequest("POST", "/metrics/"+strconv.Itoa(id)+"/points", jsonBytes)
if err != nil || resp.StatusCode != 200 {
logrus.Warnf("Could not log metric! ID: %d, err: %v", id, err)
}
}

// NewRequest wraps http.NewRequest
func (api CachetAPI) NewRequest(requestType, url string, reqBody []byte) (*http.Response, CachetResponse, error) {
req, err := http.NewRequest(requestType, api.URL+url, bytes.NewBuffer(reqBody))

req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Cachet-Token", api.Token)

transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
}
if api.Insecure {
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: api.Insecure}
client := &http.Client{
Transport: transport,
}

res, err := client.Do(req)
if err != nil {
return nil, []byte{}, err
return nil, CachetResponse{}, err
}

defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var body struct {
Data json.RawMessage `json:"data"`
}
err = json.NewDecoder(res.Body).Decode(&body)

return res, body, nil
return res, body, err
}
17 changes: 12 additions & 5 deletions cli/main.go
Expand Up @@ -92,10 +92,10 @@ func main() {

wg := &sync.WaitGroup{}
for index, monitor := range cfg.Monitors {
logrus.Infof("Starting Monitor #%d:", index)
logrus.Infof("Starting Monitor #%d: ", index)
logrus.Infof("Features: \n - %v", strings.Join(monitor.Describe(), "\n - "))

go monitor.ClockStart(cfg, wg)
go monitor.ClockStart(cfg, monitor, wg)
}

signals := make(chan os.Signal, 1)
Expand Down Expand Up @@ -164,6 +164,7 @@ func getConfiguration(path string) (*cachet.CachetMonitor, error) {
var t cachet.MonitorInterface
var err error

// get default type
monType := cachet.GetMonitorType("")
if t, ok := rawMonitor["type"].(string); ok {
monType = cachet.GetMonitorType(t)
Expand All @@ -175,11 +176,17 @@ func getConfiguration(path string) (*cachet.CachetMonitor, error) {
err = mapstructure.Decode(rawMonitor, &s)
t = &s
case "dns":
// t = cachet.DNSMonitor
var s cachet.DNSMonitor
err = mapstructure.Decode(rawMonitor, &s)
t = &s
case "icmp":
// t = cachet.ICMPMonitor
var s cachet.ICMPMonitor
err = mapstructure.Decode(rawMonitor, &s)
t = &s
case "tcp":
// t = cachet.TCPMonitor
var s cachet.TCPMonitor
err = mapstructure.Decode(rawMonitor, &s)
t = &s
default:
logrus.Errorf("Invalid monitor type (index: %d) %v", index, monType)
continue
Expand Down
4 changes: 3 additions & 1 deletion dns.go
@@ -1,3 +1,5 @@
package cachet

type DNSMonitor struct{}
type DNSMonitor struct {
AbstractMonitor `mapstructure:",squash"`
}
67 changes: 23 additions & 44 deletions http.go
Expand Up @@ -10,23 +10,23 @@ import (
"time"
)

// // Investigating template
// var HTTPTemplate = MessageTemplate{
// Subject: `{{ .Name }} - {{ .config.SystemName }}`,
// Message: `{{ .Name }} check **failed** - {{ .now }}
// Investigating template
var defaultHTTPInvestigatingTpl = MessageTemplate{
Subject: `{{ .Name }} - {{ .config.SystemName }}`,
Message: `{{ .Name }} check **failed** - {{ .now }}
// {{ .lastFailReason }}`,
// }
{{ .lastFailReason }}`,
}

// // Fixed template
// var HTTPTemplate = MessageTemplate{
// Subject: `{{ .Name }} - {{ .config.SystemName }}`,
// Message: `**Resolved** - {{ .now }}
// Fixed template
var defaultHTTPFixedTpl = MessageTemplate{
Subject: `{{ .Name }} - {{ .config.SystemName }}`,
Message: `**Resolved** - {{ .now }}
// - - -
- - -
// {{ .incident.Message }}`,
// }
{{ .incident.Message }}`,
}

type HTTPMonitor struct {
AbstractMonitor `mapstructure:",squash"`
Expand All @@ -41,32 +41,28 @@ type HTTPMonitor struct {
}

func (monitor *HTTPMonitor) test() bool {
client := &http.Client{
Timeout: time.Duration(monitor.Timeout * time.Second),
}
if monitor.Strict == false {
client.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}

req, err := http.NewRequest(monitor.Method, monitor.Target, nil)
for k, v := range monitor.Headers {
req.Header.Add(k, v)
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: monitor.Strict == false}
client := &http.Client{
Timeout: time.Duration(monitor.Timeout * time.Second),
Transport: transport,
}

resp, err := client.Do(req)
if err != nil {
monitor.lastFailReason = err.Error()

return false
}

defer resp.Body.Close()

if monitor.ExpectedStatusCode > 0 && resp.StatusCode != monitor.ExpectedStatusCode {
monitor.lastFailReason = "Unexpected response code: " + strconv.Itoa(resp.StatusCode) + ". Expected " + strconv.Itoa(monitor.ExpectedStatusCode)

return false
}

Expand All @@ -75,7 +71,6 @@ func (monitor *HTTPMonitor) test() bool {
responseBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
monitor.lastFailReason = err.Error()

return false
}

Expand All @@ -91,6 +86,9 @@ func (monitor *HTTPMonitor) test() bool {
}

func (mon *HTTPMonitor) Validate() []string {
mon.Template.Investigating.SetDefault(defaultHTTPInvestigatingTpl)
mon.Template.Fixed.SetDefault(defaultHTTPFixedTpl)

errs := mon.AbstractMonitor.Validate()

if len(mon.ExpectedBody) > 0 {
Expand Down Expand Up @@ -125,22 +123,3 @@ func (mon *HTTPMonitor) Describe() []string {

return features
}

// SendMetric sends lag metric point
/*func (monitor *Monitor) SendMetric(delay int64) error {
if monitor.MetricID == 0 {
return nil
}
jsonBytes, _ := json.Marshal(&map[string]interface{}{
"value": delay,
})
resp, _, err := monitor.config.makeRequest("POST", "/metrics/"+strconv.Itoa(monitor.MetricID)+"/points", jsonBytes)
if err != nil || resp.StatusCode != 200 {
return fmt.Errorf("Could not log data point!\n%v\n", err)
}
return nil
}
*/
4 changes: 3 additions & 1 deletion icmp.go
@@ -1,3 +1,5 @@
package cachet

type ICMPMonitor struct{}
type ICMPMonitor struct {
AbstractMonitor `mapstructure:",squash"`
}
20 changes: 8 additions & 12 deletions incident.go
Expand Up @@ -57,15 +57,13 @@ func (incident *Incident) Send(cfg *CachetMonitor) error {
}

var data struct {
Incident struct {
ID int `json:"id"`
} `json:"data"`
ID int `json:"id"`
}
if err := json.Unmarshal(body, &data); err != nil {
return fmt.Errorf("Cannot parse incident body: %v, %v", err, string(body))
if err := json.Unmarshal(body.Data, &data); err != nil {
return fmt.Errorf("Cannot parse incident body: %v, %v", err, string(body.Data))
}

incident.ID = data.Incident.ID
incident.ID = data.ID
if resp.StatusCode != 200 {
return fmt.Errorf("Could not create/update incident!")
}
Expand All @@ -84,15 +82,13 @@ func (incident *Incident) GetComponentStatus(cfg *CachetMonitor) (int, error) {
}

var data struct {
Component struct {
Status int `json:"status"`
} `json:"data"`
Status int `json:"status"`
}
if err := json.Unmarshal(body, &data); err != nil {
return 0, fmt.Errorf("Cannot parse component body: %v. Err = %v", string(body), err)
if err := json.Unmarshal(body.Data, &data); err != nil {
return 0, fmt.Errorf("Cannot parse component body: %v. Err = %v", string(body.Data), err)
}

return data.Component.Status, nil
return data.Status, nil
}

// SetInvestigating sets status to Investigating
Expand Down
29 changes: 18 additions & 11 deletions monitor.go
Expand Up @@ -13,9 +13,9 @@ const DefaultTimeFormat = "15:04:05 Jan 2 MST"
const HistorySize = 10

type MonitorInterface interface {
ClockStart(*CachetMonitor, *sync.WaitGroup)
ClockStart(*CachetMonitor, MonitorInterface, *sync.WaitGroup)
ClockStop()
tick()
tick(MonitorInterface)
test() bool

Validate() []string
Expand Down Expand Up @@ -70,6 +70,10 @@ func (mon *AbstractMonitor) Validate() []string {
mon.Timeout = DefaultTimeout
}

if mon.Timeout > mon.Interval {
errs = append(errs, "Timeout greater than interval")
}

if mon.ComponentID == 0 && mon.MetricID == 0 {
errs = append(errs, "component_id & metric_id are unset")
}
Expand All @@ -78,6 +82,10 @@ func (mon *AbstractMonitor) Validate() []string {
mon.Threshold = 100
}

if err := mon.Template.Fixed.Compile(); err != nil {
errs = append(errs, "Could not compile template: "+err.Error())
}

return errs
}
func (mon *AbstractMonitor) GetMonitor() *AbstractMonitor {
Expand All @@ -93,19 +101,19 @@ func (mon *AbstractMonitor) Describe() []string {
return features
}

func (mon *AbstractMonitor) ClockStart(cfg *CachetMonitor, wg *sync.WaitGroup) {
func (mon *AbstractMonitor) ClockStart(cfg *CachetMonitor, iface MonitorInterface, wg *sync.WaitGroup) {
wg.Add(1)
mon.config = cfg
mon.stopC = make(chan bool)
if cfg.Immediate {
mon.tick()
mon.tick(iface)
}

ticker := time.NewTicker(mon.Interval * time.Second)
for {
select {
case <-ticker.C:
mon.tick()
mon.tick(iface)
case <-mon.stopC:
wg.Done()
return
Expand All @@ -124,9 +132,9 @@ func (mon *AbstractMonitor) ClockStop() {

func (mon *AbstractMonitor) test() bool { return false }

func (mon *AbstractMonitor) tick() {
func (mon *AbstractMonitor) tick(iface MonitorInterface) {
reqStart := getMs()
up := mon.test()
up := iface.test()
lag := getMs() - reqStart

if len(mon.history) == HistorySize-1 {
Expand All @@ -139,9 +147,8 @@ func (mon *AbstractMonitor) tick() {
mon.AnalyseData()

// report lag
if up && mon.MetricID > 0 {
logrus.Infof("%v", lag)
// mon.SendMetric(lag)
if mon.MetricID > 0 {
go mon.config.API.SendMetric(mon.MetricID, lag)
}
}

Expand All @@ -158,7 +165,7 @@ func (monitor *AbstractMonitor) AnalyseData() {
t := (float32(numDown) / float32(len(monitor.history))) * 100
logrus.Printf("%s %.2f%%/%.2f%% down at %v\n", monitor.Name, t, monitor.Threshold, time.Now().UnixNano()/int64(time.Second))

if len(monitor.history) != 10 {
if len(monitor.history) != HistorySize {
// not saturated
return
}
Expand Down
2 changes: 2 additions & 0 deletions tcp.go
@@ -1,6 +1,8 @@
package cachet

type TCPMonitor struct {
AbstractMonitor `mapstructure:",squash"`

// same as output from net.JoinHostPort
// defaults to parsed config from /etc/resolv.conf when empty
DNSServer string
Expand Down

0 comments on commit b3bc1d4

Please sign in to comment.