Skip to content

Commit

Permalink
Lot of work on alerts and other stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Zef Hemel committed Sep 7, 2017
1 parent d5b49e6 commit e23c71f
Show file tree
Hide file tree
Showing 14 changed files with 468 additions and 152 deletions.
84 changes: 84 additions & 0 deletions cmd/ax/alert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"fmt"
"time"

"github.com/zefhemel/ax/pkg/alert"
"github.com/zefhemel/ax/pkg/alert/slack"
"github.com/zefhemel/ax/pkg/backend/common"
"github.com/zefhemel/ax/pkg/cache"
"github.com/zefhemel/ax/pkg/config"
)

var (
alertFlags = addQueryFlags(addAlertCommand)
alertFlagName string
)

func init() {
addAlertCommand.Flag("name", "Name for alert").Required().StringVar(&alertFlagName)
}

func setupDest() alert.Alerter {
return nil
}

func addAlertMain(rc config.RuntimeConfig, client common.Client) {
alertConfig := config.AlertConfig{
Env: rc.ActiveEnv,
Name: alertFlagName,
Selector: *alertFlags,
}

fmt.Printf("Config: %+v\n", alertConfig)
conf := config.LoadConfig()
conf.Alerts = append(conf.Alerts, alertConfig)
config.SaveConfig(conf)
}

func watchAlerts(rc config.RuntimeConfig, alertConfig config.AlertConfig) {
var alerter alert.Alerter
switch alertConfig.Service["backend"] {
case "slack":
alerter = slack.New(alertConfig.Name, alertConfig.Service)
default:
panic("No such backend")
}
query := querySelectorsToQuery(&alertConfig.Selector)
query.Follow = true
query.MaxResults = 5
seenIdCache := cache.New(fmt.Sprintf("%s/alert-%s-seen.json", rc.DataDir, alertConfig.Name))
client := determineClient(rc.Config.Environments[alertConfig.Env])
if client == nil {
fmt.Println("Cannot obtain a client for", alertConfig)
return
}
fmt.Println("Now waiting for alerts for", alertConfig.Name)
for message := range client.Query(query) {
if seenIdCache.Contains(message.UniqueID()) {
//fmt.Println("Skipping one")
continue
}
expire := time.Now().Add(time.Hour * 24 * 30)
seenIdCache.Set(message.UniqueID(), true, &expire)
fmt.Printf("[%s] Sending alert to %s: %+v\n", alertConfig.Name, alertConfig.Service["backend"], message.Map())
err := alerter.SendAlert(message)
if err != nil {
fmt.Println("Couldn't send alert", err)
}
err = seenIdCache.Flush()
if err != nil {
fmt.Println("Couldn't flush cache", err)
}
}
}

func alertMain(rc config.RuntimeConfig) {
for _, alert := range rc.Config.Alerts {
go watchAlerts(rc, alert)
}
for {
time.Sleep(time.Minute)
}
}
9 changes: 9 additions & 0 deletions cmd/ax/alert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"testing"
)

func TestSlack(t *testing.T) {
//alertSlack(os.Getenv("SLACK_TOKEN"), "#zeftest", "Sup from Go")
}
34 changes: 23 additions & 11 deletions cmd/ax/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,32 @@ import (
)

var (
queryCommand = kingpin.Command("query", "Query logs").Default()
queryCommand = kingpin.Command("query", "Query logs").Default()
alertCommand = kingpin.Command("alert", "Be alerted when logs match a query")
alertDCommand = kingpin.Command("alertd", "Be alerted when logs match a query")
addAlertCommand = alertCommand.Command("add", "Add new alert")
)

func main() {
func determineClient(em config.EnvMap) common.Client {
stat, _ := os.Stdin.Stat()
cmd := kingpin.Parse()

rc := config.BuildConfig()
var client common.Client
if (stat.Mode() & os.ModeCharDevice) == 0 {
client = stream.New(os.Stdin)
} else if rc.Env["backend"] == "docker" {
client = docker.New(rc.Env["pattern"])
} else if rc.Env["backend"] == "kibana" {
client = kibana.New(rc.Env["url"], rc.Env["auth"], rc.Env["index"])
} else if rc.Env["backend"] == "subprocess" {
client = subprocess.New(strings.Split(rc.Env["command"], " "))
} else if em["backend"] == "docker" {
client = docker.New(em["pattern"])
} else if em["backend"] == "kibana" {
client = kibana.New(em["url"], em["auth"], em["index"])
} else if em["backend"] == "subprocess" {
client = subprocess.New(strings.Split(em["command"], " "))
}
return client
}

func main() {
cmd := kingpin.Parse()

rc := config.BuildConfig()
client := determineClient(rc.Env)

switch cmd {
case "query":
Expand All @@ -54,6 +62,10 @@ func main() {
config.ListEnvs()
case "env edit":
config.EditConfig()
case "alert add":
addAlertMain(rc, client)
case "alertd":
alertMain(rc)
}

}
71 changes: 44 additions & 27 deletions cmd/ax/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,39 @@ import (
"github.com/araddon/dateparse"
"github.com/fatih/color"
"github.com/zefhemel/ax/pkg/backend/common"
"github.com/zefhemel/ax/pkg/complete"
"github.com/zefhemel/ax/pkg/config"
"github.com/zefhemel/ax/pkg/heuristic"
"github.com/zefhemel/kingpin"
yaml "gopkg.in/yaml.v2"
)

func addQueryFlags(cmd *kingpin.CmdClause) *common.QuerySelectors {
flags := &common.QuerySelectors{}
cmd.Flag("before", "Results from before").StringVar(&flags.Before)
cmd.Flag("after", "Results from after").StringVar(&flags.After)
cmd.Flag("select", "Fields to select").Short('s').HintAction(selectHintAction).StringsVar(&flags.Select)
cmd.Flag("where", "Add a filter").Short('w').HintAction(whereHintAction).StringsVar(&flags.Where)
cmd.Arg("query", "Query string").Default("").StringsVar(&flags.QueryString)
return flags
}

var (
queryBefore = queryCommand.Flag("before", "Results from before").String()
queryAfter = queryCommand.Flag("after", "Results from after").String()
queryMaxResults = queryCommand.Flag("results", "Maximum number of results").Short('n').Default("200").Int()
querySelect = queryCommand.Flag("select", "Fields to select").Short('s').HintAction(selectHintAction).Strings()
queryWhere = queryCommand.Flag("where", "Add a filter").Short('w').HintAction(whereHintAction).Strings()
//querySortDesc = queryCommand.Flag("desc", "Sort results reverse-chronologically").Default("false").Bool()
queryOutputFormat = queryCommand.Flag("output", "Output format: text|json|yaml").Short('o').Default("text").Enum("text", "yaml", "json", "pretty-json")
queryFollow = queryCommand.Flag("follow", "Follow log in quasi-realtime, similar to tail -f").Short('f').Default("false").Bool()
queryString = queryCommand.Arg("query", "Query string").Default("").Strings()
queryFlags = addQueryFlags(queryCommand)
queryFlagMaxResults int
queryFlagOutputFormat string
queryFlagFollow bool
)

func init() {
queryCommand.Flag("results", "Maximum number of results").Short('n').Default("200").IntVar(&queryFlagMaxResults)
queryCommand.Flag("output", "Output format: text|json|yaml").Short('o').Default("text").EnumVar(&queryFlagOutputFormat, "text", "yaml", "json", "pretty-json")
queryCommand.Flag("follow", "Follow log in quasi-realtime, similar to tail -f").Short('f').Default("false").BoolVar(&queryFlagFollow)
}

func whereHintAction() []string {
rc := config.BuildConfig()
resultList := make([]string, 0, 20)
for attrName, _ := range heuristic.GetCompletions(rc) {
for attrName, _ := range complete.GetCompletions(rc) {
resultList = append(resultList, fmt.Sprintf("%s=", attrName))
}
return resultList
Expand All @@ -39,7 +51,7 @@ func whereHintAction() []string {
func selectHintAction() []string {
rc := config.BuildConfig()
resultList := make([]string, 0, 20)
for attrName, _ := range heuristic.GetCompletions(rc) {
for attrName, _ := range complete.GetCompletions(rc) {
resultList = append(resultList, attrName)
}
return resultList
Expand All @@ -61,40 +73,45 @@ func buildFilters(wheres []string) []common.QueryFilter {
return filters
}

func queryMain(rc config.RuntimeConfig, client common.Client) {
func querySelectorsToQuery(flags *common.QuerySelectors) common.Query {
var before *time.Time
var after *time.Time
if *queryAfter != "" {
if flags.After != "" {
var err error
afterTime, err := dateparse.ParseLocal(*queryAfter)
afterTime, err := dateparse.ParseLocal(flags.After)
if err != nil {
fmt.Println("Could parse after date:", *queryAfter)
fmt.Println("Could parse after date:", flags.After)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "Parsed --after as %s", afterTime.Format(common.TimeFormat))
after = &afterTime
}
if *queryBefore != "" {
if flags.Before != "" {
var err error
beforeTime, err := dateparse.ParseLocal(*queryBefore)
beforeTime, err := dateparse.ParseLocal(flags.Before)
if err != nil {
fmt.Println("Could parse before date:", *queryBefore)
fmt.Println("Could parse before date:", flags.Before)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "Parsed --before as %s", beforeTime.Format(common.TimeFormat))
before = &beforeTime
}

for message := range heuristic.GatherCompletionInfo(rc, client.Query(common.Query{
QueryString: strings.Join(*queryString, " "),
return common.Query{
QueryString: strings.Join(flags.QueryString, " "),
Before: before,
After: after,
Filters: buildFilters(*queryWhere),
MaxResults: *queryMaxResults,
SelectFields: *querySelect,
Follow: *queryFollow,
})) {
printMessage(message, *queryOutputFormat)
Filters: buildFilters(flags.Where),
SelectFields: flags.Select,
}
}

func queryMain(rc config.RuntimeConfig, client common.Client) {
query := querySelectorsToQuery(queryFlags)
query.MaxResults = queryFlagMaxResults
query.Follow = queryFlagFollow
for message := range complete.GatherCompletionInfo(rc, client.Query(query)) {
printMessage(message, queryFlagOutputFormat)
}

}
Expand Down
1 change: 1 addition & 0 deletions cmd/ax/slack.token
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
xoxp-139242487670-137856077392-237793436246-2e631b7bf7134e6cc442ad496c4d189f
7 changes: 7 additions & 0 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package alert

import "github.com/zefhemel/ax/pkg/backend/common"

type Alerter interface {
SendAlert(lm common.LogMessage) error
}
59 changes: 59 additions & 0 deletions pkg/alert/slack/slack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package slack

import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"

yaml "gopkg.in/yaml.v2"

"github.com/zefhemel/ax/pkg/alert"
"github.com/zefhemel/ax/pkg/backend/common"
)

type SlackAlerter struct {
name string
token string
channel string
username string
}

func New(name string, config map[string]string) *SlackAlerter {
return &SlackAlerter{
name: name,
token: config["token"],
channel: config["channel"],
username: config["username"],
}
}

func (alerter *SlackAlerter) SendAlert(lm common.LogMessage) error {
form := url.Values{}
form.Add("token", alerter.token)
form.Add("icon_emoji", ":bell:")
form.Add("channel", alerter.channel)
buf, _ := yaml.Marshal(lm.Attributes)
form.Add("text", fmt.Sprintf("*[%s]* %s", lm.Timestamp.Format(common.TimeFormat), buf))
form.Add("username", alerter.username)

url := "https://slack.com/api/chat.postMessage"
req, err := http.NewRequest("POST", url, strings.NewReader(form.Encode()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
all, _ := ioutil.ReadAll(res.Body)
return errors.New(string(all))
}
return nil
}

var _ alert.Alerter = &SlackAlerter{}
21 changes: 21 additions & 0 deletions pkg/backend/common/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"crypto/sha1"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -30,6 +31,14 @@ type Query struct {
Follow bool
}

type QuerySelectors struct {
Before string `yaml:"before,omitempty"`
After string `yaml:"after,omitempty"`
Select []string `yaml:"select,omitempty"`
Where []string `yaml:"where,omitempty"`
QueryString []string `yaml:"query,omitempty"`
}

type LogMessage struct {
ID string `json:"id,omitempty"`
Timestamp time.Time `json:"@timestamp"`
Expand All @@ -49,6 +58,18 @@ func (lm LogMessage) Map() map[string]interface{} {
return out
}

func (lm LogMessage) UniqueID() string {
if lm.ID != "" {
return lm.ID
}
// Didn't get a unique ID from our source, let's just SHA1 the message itself
m := lm.Map()
h := sha1.New()
encoder := json.NewEncoder(h)
encoder.Encode(&m)
return fmt.Sprintf("%x", h.Sum(nil))[0:10]
}

func NewLogMessage() LogMessage {
return LogMessage{
Attributes: make(map[string]interface{}),
Expand Down
Loading

0 comments on commit e23c71f

Please sign in to comment.