Skip to content

Commit

Permalink
changed name and interface funcs for KafkaProducer, and made it compa…
Browse files Browse the repository at this point in the history
…tible with gobbler's module system; configuration for Kafka (brokers)
  • Loading branch information
Cosmin Rentea committed Mar 29, 2017
1 parent 6487e57 commit 6ee8966
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 58 deletions.
8 changes: 8 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"strings"

"github.com/cosminrentea/gobbler/server/apns"
"github.com/cosminrentea/gobbler/server/configstring"
"github.com/cosminrentea/gobbler/server/fcm"
"github.com/cosminrentea/gobbler/server/kafka"
"github.com/cosminrentea/gobbler/server/sms"
"github.com/cosminrentea/gobbler/server/websocket"
)
Expand Down Expand Up @@ -75,6 +77,7 @@ type (
APNS apns.Config
SMS sms.Config
WS websocket.Config
KafkaProducer kafka.Config
Cluster ClusterConfig
}
)
Expand Down Expand Up @@ -234,6 +237,11 @@ var (
Default("/stream/").
String(),
},
KafkaProducer: kafka.Config{
Brokers: configstring.NewFromKingpin(
kingpin.Flag("kafka-brokers", `The list Kafka brokers to which Guble should connect (formatted as host:port, separated by spaces or commas)`).
Envar("GUBLE_KAFKA_BROKERS")),
},
}
)

Expand Down
38 changes: 38 additions & 0 deletions server/configstring/configstringlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package configstring

import (
"strings"

"gopkg.in/alecthomas/kingpin.v2"
)

type List []string

func NewFromKingpin(settings kingpin.Settings) *List {
sl := make(List, 0)
settings.SetValue(&sl)
return &sl
}

func (sl *List) Set(value string) error {
delimiter := " "
if strings.Contains(value, ",") {
delimiter = ","
}
slice := strings.Split(value, delimiter)
for _, s := range slice {
if s != "" {
*sl = append(*sl, s)
}
}
return nil
}

func (sl List) String() string {
res := "["
for _, s := range sl {
res = res + " " + s
}
res = res + "]"
return res
}
55 changes: 55 additions & 0 deletions server/kafka/kafkaproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/cosminrentea/gobbler/server/configstring"
"github.com/cosminrentea/gobbler/server/service"
)

type Producer interface {
service.Stopable
Report(topic string, bytes []byte)
}

type Config struct {
Brokers *configstring.List
}

type producer struct {
Config

asyncProducer sarama.AsyncProducer
}

func NewProducer(c Config) (Producer, error) {
logger.WithField("config", c).Info("NewProducer")
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_1_0
saramaConfig.Producer.Return.Errors = false
saramaConfig.Producer.Retry.Max = 10
p, err := sarama.NewAsyncProducer(*c.Brokers, saramaConfig)
if err != nil {
logger.WithError(err).Error("Could not create AsyncProducer")
return nil, err
}
return &producer{
Config: c,
asyncProducer: p,
}, nil
}

func (p *producer) Report(topic string, bytes []byte) {
p.asyncProducer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bytes),
}
}

func (p *producer) Stop() error {
logger.Info("Stop")
if err := p.asyncProducer.Close(); err != nil {
logger.WithError(err).Error("Could not close Kafka Producer")
return err
}
return nil
}
2 changes: 1 addition & 1 deletion server/kafkareporter/logger.go → server/kafka/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafkareporter
package kafka

import (
log "github.com/Sirupsen/logrus"
Expand Down
57 changes: 0 additions & 57 deletions server/kafkareporter/kafkareporter.go

This file was deleted.

0 comments on commit 6ee8966

Please sign in to comment.