Skip to content

Commit

Permalink
simplified kafkareporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Cosmin Rentea committed Mar 28, 2017
1 parent 58d895b commit 6487e57
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions server/kafkareporter/kafkareporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,55 @@ package kafkareporter

import (
"github.com/Shopify/sarama"
"time"
"io"
)

type Reporter interface {
io.Closer
Report([]byte)
Stop() error
}

type KafkaReporterConfig struct {
BrokerAddr string
Topic string
type Config struct {
Brokers []string
Topic string
}

type kafkaReporter struct {
KafkaReporterConfig
type reporter struct {
Config

producer sarama.AsyncProducer
}

func NewReporter(c KafkaReporterConfig) (Reporter, error) {
func NewReporter(c Config) (Reporter, error) {
logger.WithField("config", c).Info("NewReporter")
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_1_0
producer, err := sarama.NewAsyncProducer([]string{c.BrokerAddr}, saramaConfig)
saramaConfig.Producer.Return.Errors = false
saramaConfig.Producer.Retry.Max = 10
saramaConfig.Producer.Retry.Backoff = time.Second
p, err := sarama.NewAsyncProducer(c.Brokers, saramaConfig)
if err != nil {
//TODO Cosmin
logger.WithError(err).Error("Could not create AsyncProducer")
return nil, err
}
return kafkaReporter{c, producer}, nil
return &reporter{
Config: c,
producer: p,
}, nil
}

func (kr *kafkaReporter) Report(b []byte) {
kr.producer.Input() <- &sarama.ProducerMessage{
Topic: kr.Topic,
func (r *reporter) Report(b []byte) {
r.producer.Input() <- &sarama.ProducerMessage{
Topic: r.Topic,
Value: sarama.ByteEncoder(b),
}
}

func (kr *kafkaReporter) Stop() error {
logger.Info("Stop")
if err := kr.producer.Close(); err != nil {
//TODO Cosmin
func (r *reporter) Close() error {
logger.Info("Close")
if err := r.producer.Close(); err != nil {
logger.WithError(err).Error("Could not close Kafka Producer")
return err
}
return nil
Expand Down

0 comments on commit 6487e57

Please sign in to comment.