Skip to content

Commit

Permalink
Add async and filter
Browse files Browse the repository at this point in the history
  • Loading branch information
evalphobia committed Sep 9, 2016
1 parent a99316f commit 9271e95
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
2 changes: 2 additions & 0 deletions config.go
Expand Up @@ -9,13 +9,15 @@ import (

const defaultRegion = "us-east-1"

// Config has AWS settings.
type Config struct {
AccessKey string
SecretKey string
Region string
Endpoint string
}

// AWSConfig creates *aws.Config object from the fields.
func (c Config) AWSConfig() *aws.Config {
cred := c.awsCredentials()
awsConf := &aws.Config{
Expand Down
40 changes: 33 additions & 7 deletions hook.go
Expand Up @@ -2,7 +2,6 @@ package logrus_kinesis

import (
"encoding/json"
"fmt"

"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
Expand All @@ -24,6 +23,7 @@ type KinesisHook struct {

defaultStreamName string
defaultPartitionKey string
async bool
levels []logrus.Level
ignoreFields map[string]struct{}
filters map[string]func(interface{}) interface{}
Expand Down Expand Up @@ -77,6 +77,12 @@ func (h *KinesisHook) SetPartitionKey(key string) {
h.defaultPartitionKey = key
}

// Async sets async flag and send log asynchroniously.
// If use this option, Fire() does not return error.
func (h *KinesisHook) Async() {
h.async = true
}

// AddIgnore adds field name to ignore.
func (h *KinesisHook) AddIgnore(name string) {
h.ignoreFields[name] = struct{}{}
Expand All @@ -87,9 +93,19 @@ func (h *KinesisHook) AddFilter(name string, fn func(interface{}) interface{}) {
h.filters[name] = fn
}

// Fire is invoked by logrus and sends log to fluentd logger.
// Fire is invoked by logrus and sends log to kinesis.
func (h *KinesisHook) Fire(entry *logrus.Entry) error {
fmt.Println(h.getStreamName(entry))
if !h.async {
return h.fire(entry)
}

// send log asynchroniously and return no error.
go h.fire(entry)
return nil
}

// Fire is invoked by logrus and sends log to kinesis.
func (h *KinesisHook) fire(entry *logrus.Entry) error {
in := &kinesis.PutRecordInput{
StreamName: stringPtr(h.getStreamName(entry)),
PartitionKey: stringPtr(h.getPartitionKey(entry)),
Expand Down Expand Up @@ -117,12 +133,22 @@ func (h *KinesisHook) getPartitionKey(entry *logrus.Entry) string {
}

func (h *KinesisHook) getData(entry *logrus.Entry) []byte {
d := entry.Data
if _, ok := d["message"]; !ok {
d["message"] = entry.Message
if _, ok := entry.Data["message"]; !ok {
entry.Data["message"] = entry.Message
}

data := make(logrus.Fields)
for k, v := range entry.Data {
if _, ok := h.ignoreFields[k]; ok {
continue
}
if fn, ok := h.filters[k]; ok {
v = fn(v)
}
data[k] = v
}

bytes, err := json.Marshal(d)
bytes, err := json.Marshal(data)
if err != nil {
return nil
}
Expand Down

0 comments on commit 9271e95

Please sign in to comment.