Skip to content

Commit

Permalink
Merge pull request #18 from cosminrentea/feature/reporting-kafka
Browse files Browse the repository at this point in the history
Reporting using kafka
  • Loading branch information
cosminrentea committed Apr 4, 2017
2 parents 1741c20 + 6f39216 commit 334d2e6
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 21 deletions.
6 changes: 6 additions & 0 deletions scripts/generate_mocks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,10 @@ $MOCKGEN -self_package router -package sms \
github.com/cosminrentea/gobbler/server/store \
MessageStore &

# server/configstring mocks
$MOCKGEN -package configstring \
-destination server/configstring/mocks_kingpin_gen_test.go \
gopkg.in/alecthomas/kingpin.v2 \
Settings &

wait
2 changes: 1 addition & 1 deletion server/apns/apns_pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func newApns2Client(certificate tls.Certificate) *apns2Client {
return c
}

// interface closable used used by apns_sender
// interface closable used by apns_sender
func (c *apns2Client) CloseTLS() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"strings"

"github.com/cosminrentea/gobbler/server/apns"
"github.com/cosminrentea/gobbler/server/configstring"
"github.com/cosminrentea/gobbler/server/fcm"
"github.com/cosminrentea/gobbler/server/kafka"
"github.com/cosminrentea/gobbler/server/sms"
"github.com/cosminrentea/gobbler/server/websocket"
)
Expand Down Expand Up @@ -75,6 +77,7 @@ type (
APNS apns.Config
SMS sms.Config
WS websocket.Config
KafkaProducer kafka.Config
Cluster ClusterConfig
}
)
Expand Down Expand Up @@ -223,6 +226,9 @@ var (
Default(strconv.Itoa(runtime.NumCPU())).
Envar("GUBLE_SMS_WORKERS").
Int(),
KafkaReportingTopic:kingpin.Flag("sms-kafka-topic", "The name of the SMS-Reporting Kafka topic").
Envar("GUBLE_SMS_KAFKA_TOPIC").
String(),
IntervalMetrics: &defaultSMSMetrics,
},
WS: websocket.Config{
Expand All @@ -234,6 +240,11 @@ var (
Default("/stream/").
String(),
},
KafkaProducer: kafka.Config{
Brokers: configstring.NewFromKingpin(
kingpin.Flag("kafka-brokers", `The list Kafka brokers to which Guble should connect (formatted as host:port, separated by spaces or commas)`).
Envar("GUBLE_KAFKA_BROKERS")),
},
}
)

Expand Down
11 changes: 11 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func TestParsingOfEnvironmentVariables(t *testing.T) {
os.Setenv("GUBLE_NODE_REMOTES", "127.0.0.1:8080 127.0.0.1:20002")
defer os.Unsetenv("GUBLE_NODE_REMOTES")

os.Setenv("GUBLE_KAFKA_BROKERS", "127.0.0.1:9092 127.0.0.1:9091")
defer os.Unsetenv("GUBLE_KAFKA_BROKERS")

os.Setenv("GUBLE_SMS_KAFKA_TOPIC", "sms_reporting_topic")
defer os.Unsetenv("GUBLE_SMS_KAFKA_TOPIC")

// when we parse the arguments from environment variables
parseConfig()

Expand Down Expand Up @@ -143,6 +149,8 @@ func TestParsingArgs(t *testing.T) {
"--pg-password", "pg-password",
"--pg-dbname", "pg-dbname",
"--remotes", "127.0.0.1:8080 127.0.0.1:20002",
"--kafka-brokers", "127.0.0.1:9092 127.0.0.1:9091",
"--sms-kafka-topic", "sms_reporting_topic",
}

// when we parse the arguments from command-line flags
Expand Down Expand Up @@ -188,6 +196,9 @@ func assertArguments(a *assert.Assertions) {
a.Equal("dev", *Config.EnvName)
a.Equal("mem", *Config.Profile)

a.Equal("[ 127.0.0.1:9092 127.0.0.1:9091]", (*Config.KafkaProducer.Brokers).String())
a.Equal("sms_reporting_topic", *Config.SMS.KafkaReportingTopic)

assertClusterRemotes(a)
}

Expand Down
42 changes: 42 additions & 0 deletions server/configstring/configstringlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package configstring

import (
"strings"

"gopkg.in/alecthomas/kingpin.v2"
)

type List []string

func NewFromKingpin(settings kingpin.Settings) *List {
sl := make(List, 0)
settings.SetValue(&sl)
return &sl
}

func (sl *List) Set(value string) error {
delimiter := " "
if strings.Contains(value, ",") {
delimiter = ","
}
slice := strings.Split(value, delimiter)
for _, s := range slice {
if s != "" {
*sl = append(*sl, s)
}
}
return nil
}

func (sl *List) IsEmpty() bool {
return len(*sl) == 0
}

func (sl List) String() string {
res := "["
for _, s := range sl {
res = res + " " + s
}
res = res + "]"
return res
}
20 changes: 20 additions & 0 deletions server/configstring/configstringlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package configstring

import (
"testing"
"github.com/cosminrentea/gobbler/testutil"
"github.com/stretchr/testify/assert"
"github.com/golang/mock/gomock"
)

func TestList_IsEmpty(t *testing.T) {
ctrl, finish := testutil.NewMockCtrl(t)
defer finish()

m := NewMockSettings(ctrl)
m.EXPECT().SetValue(gomock.Any())

cs := NewFromKingpin(m)

assert.True(t, cs.IsEmpty())
}
38 changes: 38 additions & 0 deletions server/configstring/mocks_kingpin_gen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: gopkg.in/alecthomas/kingpin.v2 (interfaces: Settings)

package configstring

import (
gomock "github.com/golang/mock/gomock"
kingpin_v2 "gopkg.in/alecthomas/kingpin.v2"
)

// Mock of Settings interface
type MockSettings struct {
ctrl *gomock.Controller
recorder *_MockSettingsRecorder
}

// Recorder for MockSettings (not exported)
type _MockSettingsRecorder struct {
mock *MockSettings
}

func NewMockSettings(ctrl *gomock.Controller) *MockSettings {
mock := &MockSettings{ctrl: ctrl}
mock.recorder = &_MockSettingsRecorder{mock}
return mock
}

func (_m *MockSettings) EXPECT() *_MockSettingsRecorder {
return _m.recorder
}

func (_m *MockSettings) SetValue(_param0 kingpin_v2.Value) {
_m.ctrl.Call(_m, "SetValue", _param0)
}

func (_mr *_MockSettingsRecorder) SetValue(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetValue", arg0)
}
2 changes: 1 addition & 1 deletion server/connector/mocks_connector_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package connector
import (
context "context"
protocol "github.com/cosminrentea/gobbler/protocol"

router "github.com/cosminrentea/gobbler/server/router"
gomock "github.com/golang/mock/gomock"
http "net/http"
Expand Down
17 changes: 16 additions & 1 deletion server/gobbler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"syscall"

"github.com/Bogh/gcm"
"github.com/cosminrentea/gobbler/server/kafka"
"github.com/pkg/profile"
"golang.org/x/crypto/ssh/terminal"
)
Expand Down Expand Up @@ -112,6 +113,20 @@ var CreateModules = func(router router.Router) (modules []interface{}) {

modules = append(modules, rest.NewRestMessageAPI(router, "/api/"))

var kafkaProducer kafka.Producer
if (*Config.KafkaProducer.Brokers).IsEmpty() {
logger.Info("KafkaProducer: disabled")
} else {
logger.Info("KafkaProducer: enabled")
var errKafka error
kafkaProducer, errKafka = kafka.NewProducer(Config.KafkaProducer)
if errKafka != nil {
logger.WithError(errKafka).Error("Could not create KafkaProducer")
} else {
modules = append(modules, kafkaProducer)
}
}

if *Config.WS.Enabled {
if wsHandler, err := websocket.NewWSHandler(router, *Config.WS.Prefix); err != nil {
logger.WithError(err).Error("Error loading WSHandler module")
Expand Down Expand Up @@ -175,7 +190,7 @@ var CreateModules = func(router router.Router) (modules []interface{}) {
if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" {
logger.Panic("The API Key has to be provided when NEXMO SMS connector is enabled")
}
nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret)
nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.SMS.KafkaReportingTopic)
if err != nil {
logger.WithError(err).Error("Error creating Nexmo Sender")
}
Expand Down
69 changes: 69 additions & 0 deletions server/kafka/kafkaproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/cosminrentea/gobbler/server/configstring"
"github.com/cosminrentea/gobbler/server/service"
"time"
)

type Producer interface {
service.Startable
service.Stopable
Report(topic string, bytes []byte, key string)
}

type Config struct {
Brokers *configstring.List
}

type producer struct {
Config

asyncProducer sarama.AsyncProducer
}

func NewProducer(c Config) (Producer, error) {
logger.WithField("config", c).Info("NewProducer")
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_1_0
saramaConfig.Producer.Retry.Max = 10
saramaConfig.Producer.Flush.Frequency = time.Second
p, err := sarama.NewAsyncProducer(*c.Brokers, saramaConfig)
if err != nil {
logger.WithError(err).Error("Could not create AsyncProducer")
return nil, err
}
return &producer{
Config: c,
asyncProducer: p,
}, nil
}

func (p *producer) Report(topic string, bytes []byte, key string) {
logger.WithField("topic", topic).Debug("Reporting to Kafka topic")
p.asyncProducer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bytes),
}
}

func (p *producer) Start() error {
logger.Info("Start")
go func() {
for err := range p.asyncProducer.Errors() {
logger.WithError(err).Error("Could not write to Kafka")
}
}()
return nil
}

func (p *producer) Stop() error {
logger.Info("Stop")
if err := p.asyncProducer.Close(); err != nil {
logger.WithError(err).Error("Could not close Kafka Producer")
return err
}
return nil
}
7 changes: 7 additions & 0 deletions server/kafka/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kafka

import (
log "github.com/Sirupsen/logrus"
)

var logger = log.WithField("module", "kafka")
Loading

0 comments on commit 334d2e6

Please sign in to comment.