Skip to content

Commit

Permalink
Cloudwatch back-end (#19)
Browse files Browse the repository at this point in the history
* Initial cloudwatch implementation

* Updated with `context`

* Refactored signal handling slightly

* Fixes for cloudwatch

* Initial cloudwatch implementation

* Updated with `context`

* Fixes for cloudwatch

* Factored out retry logic and LogMessage channel deduplication

* More graceful recovery upon parse errors of the attribute cache, also: removing slack.token

* Follow support for cloudwatch

* README update for cloudwatch

* Context only for queries, not handled elsewhere yet.

* Implemented --before and --after for cloudwatch

* Graceful cache fallback

* Rename 'client' to 'logs'

* Fixes and refactoring of follow requerying
  • Loading branch information
zefhemel authored and romanlevin committed Apr 19, 2018
1 parent 789fcff commit 2510860
Show file tree
Hide file tree
Showing 13 changed files with 495 additions and 99 deletions.
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# Ax

![Logo](https://raw.githubusercontent.com/egnyte/ax/master/ax.png)

[![Travis CI status image](https://travis-ci.org/egnyte/ax.svg?branch=master)](https://travis-ci.org/egnyte/ax)

# Ax
It's a structured logging world we live in, but do we really have to look at JSON logs? Not with Ax.

Ax features:

* Read logs from various sources, currently:
* Kibana
* Piped input
* Docker containers
* [Kibana](https://www.elastic.co/products/kibana)
* [AWS Cloudwatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html)
* Piped input
* Docker containers
* Filter logs based on attribute (field) values as well as text phrase search
* Select only the attributes you are interested in
* The ability to "follow" logs (Ax keeps running and shows new results as they come in)
Expand Down Expand Up @@ -48,7 +50,6 @@ This will also put the `ax` binary into your `$GOPATH/bin` so make sure that's i
To update Ax to the latest and greatest, just rerun the command above.

## Development

After the above `go get` call, you will have a git checkout of the repo under `$GOPATH/src/github.com/egnyte/ax`. If you want to work on Ax, just fork the repo and update `.git/config` appropriately.

To make sure you're building Ax with the approriate versions of its dependencies run:
Expand Down Expand Up @@ -76,12 +77,12 @@ For zsh, add to `~/.zshrc`:

After this, you can auto complete commands, flags, environments, docker container names and even attribute names by hittig TAB. Use it, love it, never go back.

## Setup with Kibana
To setup Ax for use with Kibana, run:
## Setup with Kibana or Cloudwatch
To setup Ax for use with Kibana or Cloudwatch, run:

ax env add

This will prompt you for a name, backend-type (kibana in this case), URL and if this URL is basic auth protected a username and password, and then an index.
This will prompt you for a name, backend-type and various other things depending on your backend of choice. After a successful setup, you should be ready to go.

To see if it works, just run:

Expand All @@ -108,6 +109,7 @@ You can also pipe logs directly into Ax:
tail -f /var/log/something.log | ax

# Filtering and selecting attributes

Looking at all logs is nice, but it only gets really interesting if you can start to filter stuff and by selecting only certain attributes.

To search for all logs containing the phrase "Traceback":
Expand All @@ -129,11 +131,13 @@ If you have a lot of extra attributes in your log messages, you can select just
ax --where domain=zef --select message --select tag

# "Tailing" logs

Use the `-f` flag:

ax -f --where domain=zef

# Different output formats

Don't like the default textual output, perhaps you prefer YAML:

ax --output yaml
Expand All @@ -148,4 +152,5 @@ or pretty JSON:
ax query --help

# Found anything broken?

Report it as a Github issue!
8 changes: 5 additions & 3 deletions cmd/ax/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/zefhemel/kingpin"

"github.com/egnyte/ax/pkg/backend/cloudwatch"
"github.com/egnyte/ax/pkg/backend/common"
"github.com/egnyte/ax/pkg/backend/docker"
"github.com/egnyte/ax/pkg/backend/kibana"
Expand All @@ -36,6 +37,8 @@ func determineClient(em config.EnvMap) common.Client {
client = docker.New(em["pattern"])
} else if em["backend"] == "kibana" {
client = kibana.New(em["url"], em["auth"], em["index"])
} else if em["backend"] == "cloudwatch" {
client = cloudwatch.New(em["accesskey"], em["accesssecretkey"], em["region"], em["groupname"])
} else if em["backend"] == "subprocess" {
client = subprocess.New(strings.Split(em["command"], " "))
}
Expand Down Expand Up @@ -63,10 +66,9 @@ func main() {
rc := config.BuildConfig()
client := determineClient(rc.Env)

ctx := sigtermContextHandler(context.Background())

switch cmd {
case "query":
ctx := sigtermContextHandler(context.Background())
if client == nil {
if len(rc.Config.Environments) == 0 {
// Assuming first time use
Expand All @@ -87,7 +89,7 @@ func main() {
case "alert add":
addAlertMain(rc, client)
case "alertd":
alertMain(ctx, rc)
alertMain(context.Background(), rc)
case "version":
println(version)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/ax/slack.token

This file was deleted.

145 changes: 145 additions & 0 deletions pkg/backend/cloudwatch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package cloudwatch

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/egnyte/ax/pkg/backend/common"
)

type CloudwatchClient struct {
logs *cloudwatchlogs.CloudWatchLogs
groupName string
}

func attemptParseJSON(str string) map[string]interface{} {
m := make(map[string]interface{})
// Find start of JSON blob
startIdx := strings.Index(str, "{")
if startIdx == -1 { // If not found, fall back to dumping the whole thing into the "message" field
m["message"] = str
return m
}
err := json.Unmarshal([]byte(str[startIdx:]), &m)
if err != nil {
m["message"] = str
}
return m
}

func logEventToMessage(query common.Query, logEvent *cloudwatchlogs.FilteredLogEvent) common.LogMessage {
message := common.NewLogMessage()
message.ID = *logEvent.EventId
message.Timestamp = time.Unix((*logEvent.Timestamp)/1000, (*logEvent.Timestamp)%1000)
message.Attributes = common.Project(attemptParseJSON(*logEvent.Message), query.SelectFields)
return message
}

// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
func queryToFilterPattern(query common.Query) string {
filterParts := make([]string, 0)
for _, filter := range query.Filters {
filterParts = append(filterParts, fmt.Sprintf("($.%s %s \"%s\")", filter.FieldName, filter.Operator, filter.Value))
}
var filterPattern string
if len(query.Filters) == 0 {
filterPattern = query.QueryString
} else {
filterPattern = fmt.Sprintf("%s { %s }", query.QueryString, strings.Join(filterParts, " && "))
}

return strings.TrimSpace(filterPattern)
}

func (client *CloudwatchClient) readLogBatch(ctx context.Context, query common.Query) ([]common.LogMessage, error) {
var startTime, endTime *int64 = nil, nil
if query.After != nil {
startTimeVal := (*query.After).UnixNano() / int64(time.Millisecond)
startTime = &startTimeVal
}
if query.Before != nil {
endTimeVal := (*query.Before).UnixNano() / int64(time.Millisecond)
endTime = &endTimeVal
}
resp, err := client.logs.FilterLogEventsWithContext(ctx, &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(client.groupName),
FilterPattern: aws.String(queryToFilterPattern(query)),
Limit: aws.Int64(int64(query.MaxResults)),
StartTime: startTime,
EndTime: endTime,
})
if err != nil {
return nil, err
}
messages := make([]common.LogMessage, 0, 20)
for _, message := range resp.Events {
messages = append(messages, logEventToMessage(query, message))
}
return messages, nil
}

func (client *CloudwatchClient) Query(ctx context.Context, query common.Query) <-chan common.LogMessage {
if query.Follow {
return common.ReQueryFollow(ctx, func() ([]common.LogMessage, error) {
return client.readLogBatch(ctx, query)
})
}
resultChan := make(chan common.LogMessage)

go func() {
messages, err := client.readLogBatch(ctx, query)
if err != nil {
fmt.Printf("Error while fetching logs: %s\n", err)
close(resultChan)
return
}
for _, message := range messages {
resultChan <- message
}
close(resultChan)
}()

return resultChan
}

func (client *CloudwatchClient) ListGroups() ([]string, error) {
resp, err := client.logs.DescribeLogGroups(&cloudwatchlogs.DescribeLogGroupsInput{})
if err != nil {
return nil, err
}

groupNames := make([]string, 0)
for _, stream := range resp.LogGroups {
groupNames = append(groupNames, *stream.LogGroupName)
}

return groupNames, err
}

func New(accessKey, accessSecretKey, region, groupName string) *CloudwatchClient {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(accessKey, accessSecretKey, ""),
})

if err != nil {
fmt.Printf("Could not create AWS Session: %s\n", err)
return nil
}
logs := cloudwatchlogs.New(sess)

return &CloudwatchClient{
logs: logs,
groupName: groupName,
}

}

var _ common.Client = &CloudwatchClient{}
69 changes: 69 additions & 0 deletions pkg/backend/cloudwatch/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package cloudwatch

import (
"testing"

"github.com/egnyte/ax/pkg/backend/common"
)

func TestParsing(t *testing.T) {
msg := attemptParseJSON(`2017-09-27T09:01:01.245468966Z {"asctime": "2017-09-27 09:01:01,245", "created": 1506502861.2452097, "filename": "connectionpool.py", "funcName": "_make_request", "levelname": "DEBUG", "levelno": 10, "module": "connectionpool", "msecs": 245.2096939086914, "message": "http://localhost:None \"POST /v1.29/exec/1744fb9d8aa1ed1f94f729d4e0474251dfab9e0523385d42e77ea10acda53957/start HTTP/1.1\" 101 0", "name": "urllib3.connectionpool", "pathname": "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", "process": 5, "processName": "MainProcess", "relativeCreated": 2276.298999786377, "thread": 140018892404480, "threadName": "MainThread", "turbo_request_id": null, "user": null, "tid": 5, "source": "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py:395", "client_id": null}
`)
if msg["filename"] != "connectionpool.py" {
t.Errorf("Parsed: %+v", msg)
t.Fail()
}
msg = attemptParseJSON(`{"asctime": "2017-09-27 09:01:01,245", "created": 1506502861.2452097, "filename": "connectionpool.py", "funcName": "_make_request", "levelname": "DEBUG", "levelno": 10, "module": "connectionpool", "msecs": 245.2096939086914, "message": "http://localhost:None \"POST /v1.29/exec/1744fb9d8aa1ed1f94f729d4e0474251dfab9e0523385d42e77ea10acda53957/start HTTP/1.1\" 101 0", "name": "urllib3.connectionpool", "pathname": "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", "process": 5, "processName": "MainProcess", "relativeCreated": 2276.298999786377, "thread": 140018892404480, "threadName": "MainThread", "turbo_request_id": null, "user": null, "tid": 5, "source": "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py:395", "client_id": null}
`)
if msg["filename"] != "connectionpool.py" {
t.Errorf("Parsed: %+v", msg)
t.Fail()
}
}

func TestFilterGenerator(t *testing.T) {
output := queryToFilterPattern(common.Query{
QueryString: "Test",
Filters: []common.QueryFilter{
{
FieldName: "name",
Operator: "=",
Value: "zef",
},
},
})
if output != `Test { ($.name = "zef") }` {
t.Fatal(output)
}

output = queryToFilterPattern(common.Query{
Filters: []common.QueryFilter{
{
FieldName: "name",
Operator: "=",
Value: "zef",
},
},
})
if output != `{ ($.name = "zef") }` {
t.Fatal(output)
}

output = queryToFilterPattern(common.Query{
Filters: []common.QueryFilter{
{
FieldName: "name",
Operator: "=",
Value: "zef",
},
{
FieldName: "age",
Operator: "=",
Value: "34",
},
},
})
if output != `{ ($.name = "zef") && ($.age = "34") }` {
t.Fatal(output)
}
}
Loading

0 comments on commit 2510860

Please sign in to comment.