Skip to content

Commit

Permalink
Add json message output for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
oivoodoo committed Nov 1, 2016
1 parent 9b96dc2 commit 41d3ce4
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 19 deletions.
8 changes: 5 additions & 3 deletions elasticsearch.go
Expand Up @@ -84,9 +84,11 @@ func (p *ESPlugin) Init(URI string) {
p.done = make(chan bool)
p.indexor.Start()

// Only start the ErrorHandler goroutine when in verbose mode
// no need to burn ressources otherwise
go p.ErrorHandler()
if Settings.verbose {
// Only start the ErrorHandler goroutine when in verbose mode
// no need to burn ressources otherwise
go p.ErrorHandler()
}

log.Println("Initialized Elasticsearch Plugin")
return
Expand Down
15 changes: 15 additions & 0 deletions examples/server.go
@@ -0,0 +1,15 @@
package main

import (
"io"
"net/http"
)

func hello(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "Hello world!")
}

func main() {
http.HandleFunc("/", hello)
http.ListenAndServe(":8000", nil)
}
58 changes: 44 additions & 14 deletions output_kafka.go
@@ -1,7 +1,10 @@
package main

import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/buger/gor/proto"
"io"
"log"
"strings"
"time"
Expand All @@ -10,39 +13,56 @@ import (
// KafkaConfig should contains required information to
// build producers.
type KafkaConfig struct {
zookeeper string
topic string
host string
topic string
}

// KafkaOutput should make producer client.
type KafkaOutput struct {
address string
config *KafkaConfig
producer sarama.AsyncProducer
}

// KafkaMessage should contains catched request information that should be
// passed as Json to Apache Kafka.
type KafkaMessage struct {
ReqURL string `json:"Req_URL"`
ReqMethod string `json:"Req_Method"`
ReqUserAgent string `json:"Req_User-Agent"`
ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"`
ReqAccept string `json:"Req_Accept,omitempty"`
ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"`
ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"`
ReqConnection string `json:"Req_Connection,omitempty"`
ReqCookies string `json:"Req_Cookies,omitempty"`
}

// KafkaOutputFrequency in milliseconds
const KafkaOutputFrequency = 500

// NewKafkaOutput creates instance of kafka producer client.
func NewKafkaOutput(address string, config *KafkaConfig) *KafkaOutput {
func NewKafkaOutput(address string, config *KafkaConfig) io.Writer {
c := sarama.NewConfig()
c.Producer.RequiredAcks = sarama.WaitForLocal
c.Producer.Compression = sarama.CompressionSnappy
c.Producer.Flush.Frequency = 500 * time.Millisecond
c.Producer.Flush.Frequency = KafkaOutputFrequency * time.Millisecond

brokerList := strings.Split(config.zookeeper, ",")
brokerList := strings.Split(config.host, ",")

producer, err := sarama.NewAsyncProducer(brokerList, c)
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) producer:", err)
}

o := &KafkaOutput{
address: address,
config: config,
producer: producer,
}

// Start infinite loop for tracking errors for kafka producer.
go o.ErrorHandler()
if Settings.verbose {
// Start infinite loop for tracking errors for kafka producer.
go o.ErrorHandler()
}

return o
}
Expand All @@ -55,14 +75,24 @@ func (o *KafkaOutput) ErrorHandler() {
}

func (o *KafkaOutput) Write(data []byte) (n int, err error) {
buf := make(sarama.ByteEncoder, len(data))
copy(buf, data)
kafkaMessage := KafkaMessage{
ReqURL: string(proto.Path(data)),
ReqMethod: string(proto.Method(data)),
ReqUserAgent: string(proto.Header(data, []byte("User-Agent"))),
ReqAcceptLanguage: string(proto.Header(data, []byte("Accept-Language"))),
ReqAccept: string(proto.Header(data, []byte("Accept"))),
ReqAcceptEncoding: string(proto.Header(data, []byte("Accept-Encoding"))),
ReqIfModifiedSince: string(proto.Header(data, []byte("If-Modified-Since"))),
ReqConnection: string(proto.Header(data, []byte("Connection"))),
ReqCookies: string(proto.Header(data, []byte("Cookie"))),
}
jsonMessage, _ := json.Marshal(&kafkaMessage)
message := sarama.StringEncoder(jsonMessage)

o.producer.Input() <- &sarama.ProducerMessage{
Topic: o.config.topic,
Key: sarama.StringEncoder(o.address),
Value: buf,
Value: message,
}

return len(data), nil
return len(message), nil
}
2 changes: 1 addition & 1 deletion plugins.go
Expand Up @@ -143,5 +143,5 @@ func InitPlugins() {
registerPlugin(NewHTTPOutput, options, &Settings.outputHTTPConfig)
}

registerPlugin(NewKafkaOutput, &Settings.outputKafkaConfig)
registerPlugin(NewKafkaOutput, "", &Settings.outputKafkaConfig)
}
3 changes: 2 additions & 1 deletion settings.go
Expand Up @@ -127,7 +127,8 @@ func init() {
flag.BoolVar(&Settings.outputHTTPConfig.Debug, "output-http-debug", false, "Enables http debug output.")

flag.StringVar(&Settings.outputHTTPConfig.elasticSearch, "output-http-elasticsearch", "", "Send request and response stats to ElasticSearch:\n\tgor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'")
flag.StringVar(&Settings.outputKafkaConfig.zookeeper, "output-kafka-zookeeper", "", "Send request and response stats to Kafka:\n\tgor --input-raw :8080 --output-kafka-zookeeper '192.168.0.1:2181,192.168.0.2:2181'")

flag.StringVar(&Settings.outputKafkaConfig.host, "output-kafka-host", "", "Send request and response stats to Kafka:\n\tgor --input-raw :8080 --output-kafka-host '192.168.0.1:2181,192.168.0.2:2181'")
flag.StringVar(&Settings.outputKafkaConfig.topic, "output-kafka-topic", "", "Send request and response stats to Kafka:\n\tgor --input-raw :8080 --output-kafka-topic 'kafka-log'")

flag.Var(&Settings.modifierConfig.headers, "http-set-header", "Inject additional headers to http reqest:\n\tgor --input-raw :8080 --output-http staging.com --http-set-header 'User-Agent: Gor'")
Expand Down

0 comments on commit 41d3ce4

Please sign in to comment.