Skip to content

Commit

Permalink
Merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
Marian Craciunescu committed Apr 5, 2017
2 parents c849803 + 334d2e6 commit dcd06e4
Show file tree
Hide file tree
Showing 23 changed files with 401 additions and 73 deletions.
18 changes: 9 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ services:
before_script:
- psql -c 'create database guble;' -U postgres
before_install:
- go get github.com/wadey/gocovmerge
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- go get github.com/wadey/gocovmerge github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- GO_TEST_DISABLED=true go test -v ./...
- GO_TEST_DISABLED=true go test -v ./... && GO_TEST_DISABLED=true go test -v -tags cluster ./...
after_success:
- scripts/generate_coverage.sh
- goveralls -coverprofile=full_cov.out -service=travis-ci
- if [ "$TRAVIS_BRANCH" == "master" ]; then
GOOS=linux go build -a --ldflags '-linkmode external -extldflags "-static"' . ;
GOOS=linux go build -a --ldflags '-linkmode external -extldflags "-static"' -o ./guble-cli/guble-cli ./guble-cli ;
fi

# - if [ "$TRAVIS_BRANCH" == "master" ]; then
# GOOS=linux go build -a --ldflags '-linkmode external -extldflags "-static"' . ;
# GOOS=linux go build -a --ldflags '-linkmode external -extldflags "-static"' -o ./guble-cli/guble-cli ./guble-cli ;
# fi
# docker build -t cosminrentea/gobbler . ;
# docker login -e="$DOCKER_EMAIL" -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" ;
# docker push cosminrentea/gobbler ;
env:
global:
- secure: V9+UswYO6l0EuekA5YBviUdz0OcWfT3QsY1Bgoml8lmWP3/Rdq0fpxGh1hHUWt1pyAl3Aymw5Sc9DU/STmb5k6YjimS649Hu3jZ2AJfjLxh8ZA+vTgFiQc4mN4FqDAFhnPVB/aOSQhGyRlWalxikNy3nhcJrN+uWOpzRqzg0icNOdfTKpSH1cRJO0Ja34f4AEmLuNvGUAyZVpLuZJFL5mE9sJ1G1baqgFf/kTQ67jF+Ezg+1AY+NYaYwd5PUGFgIKf/qVT5Wqtrff26Yxzr/hECEBypAvNmCdLSoV/qyzZvzUTgYZTPmUnDks0uUEup9YzEQZ9XxwIQyHSXZ9D6h2vZxyr0TlZvBtdzWNiLHjBSISF8ZzOthI7NIi/e4YRYlqCF3apZuRo6o2fneHqzonza0OpJQdCKACXgycFe0ZTXk1o7SdT1d1JgeFckmL0kS8H2N4E/DaIAPq8zaC4bOlaYaUYt6vXNwEKK99q0X97gLJFdrBBY7lzKs9bbVa7b2Dhkh67PUt6WhoHUjLSN+9jTn+oda8VEKtXxyaWM6AsCRHgBiy0VaxuHbU2k1mpSCLdBfJGbrDITA4+nyPopv/oky4xHX1FGSMGFw73Ejafu9Xo0cpvIpVcNjeagUugQ5ThPQMSua9hxSZJx6alIUhptDUesiYHJAWUVPQi4N/3A=
Expand Down
6 changes: 6 additions & 0 deletions scripts/generate_mocks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,10 @@ $MOCKGEN -self_package router -package sms \
github.com/cosminrentea/gobbler/server/store \
MessageStore &

# server/configstring mocks
$MOCKGEN -package configstring \
-destination server/configstring/mocks_kingpin_gen_test.go \
gopkg.in/alecthomas/kingpin.v2 \
Settings &

wait
2 changes: 1 addition & 1 deletion server/apns/apns_pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func newApns2Client(certificate tls.Certificate) *apns2Client {
return c
}

// interface closable used used by apns_sender
// interface closable used by apns_sender
func (c *apns2Client) CloseTLS() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
18 changes: 9 additions & 9 deletions server/benchmarking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type testgroup struct {
t *testing.T
groupID int
addr string
done chan bool
doneC chan bool
messagesToSend int
consumer, publisher client.Client
topic string
Expand All @@ -30,7 +30,7 @@ func newTestgroup(t *testing.T, groupID int, addr string, messagesToSend int) *t
t: t,
groupID: groupID,
addr: addr,
done: make(chan bool),
doneC: make(chan bool),
messagesToSend: messagesToSend,
}
}
Expand Down Expand Up @@ -89,13 +89,13 @@ func TestThroughput(t *testing.T) {
log.Print("wait for finishing")
for i, test := range testgroups {
select {
case successFlag := <-test.done:
case successFlag := <-test.doneC:
if !successFlag {
t.Logf("testgroup %v returned with error", i)
t.FailNow()
return
}
case <-time.After(time.Second * 20):
case <-time.After(time.Second * 10):
t.Log("timeout. testgroups not ready before timeout")
t.Fail()
return
Expand Down Expand Up @@ -139,7 +139,7 @@ func (tg *testgroup) expectStatusMessage(name string, arg string) {
assert.Equal(tg.t, arg, notify.Arg)
case <-time.After(time.Second * 1):
tg.t.Logf("[%v] no notification of type %s until timeout", tg.groupID, name)
tg.done <- false
tg.doneC <- false
tg.t.Fail()
return
}
Expand All @@ -161,21 +161,21 @@ func (tg *testgroup) Start() {
assert.Equal(tg.t, tg.topic, string(msg.Path))
if !assert.Equal(tg.t, body, string(msg.Body)) {
tg.t.FailNow()
tg.done <- false
tg.doneC <- false
}
case msg := <-tg.consumer.Errors():
tg.t.Logf("[%v] received error: %v", tg.groupID, msg)
tg.done <- false
tg.doneC <- false
tg.t.Fail()
return
case <-time.After(time.Second * 5):
tg.t.Logf("[%v] no message received until timeout, expected message %v", tg.groupID, i)
tg.done <- false
tg.doneC <- false
tg.t.Fail()
return
}
}
tg.done <- true
tg.doneC <- true
}

func (tg *testgroup) Clean() {
Expand Down
2 changes: 2 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"net"
"strconv"
"time"
)

var (
Expand Down Expand Up @@ -122,6 +123,7 @@ func (cluster *Cluster) Stop() error {
if cluster.synchronizer != nil {
close(cluster.synchronizer.stopC)
}
cluster.memberlist.Leave(time.Second)
return cluster.memberlist.Shutdown()
}

Expand Down
45 changes: 33 additions & 12 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cluster

import (
"io/ioutil"

"github.com/cosminrentea/gobbler/server/store/filestore"

"github.com/cosminrentea/gobbler/protocol"
Expand All @@ -12,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"

"errors"
"io/ioutil"
"net"
"testing"
"time"
Expand Down Expand Up @@ -51,13 +50,14 @@ func TestCluster_StartCheckStop(t *testing.T) {
node.Router = newDummyRouter(t)

err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
a.NoError(err, "No error should be raised when Starting the Cluster")

err = node.Check()
a.NoError(err, "Health-check score of a Cluster with a single node should be OK")

err = node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}

func TestCluster_BroadcastStringAndMessageAndCheck(t *testing.T) {
Expand All @@ -70,8 +70,11 @@ func TestCluster_BroadcastStringAndMessageAndCheck(t *testing.T) {
node1.Router = newDummyRouter(t)

//start the cluster node 1
defer node1.Stop()
err = node1.Start()
defer func() {
err := node1.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
a.NoError(err, "No error should be raised when starting node 1 of the Cluster")

config2 := testConfigAnother()
Expand All @@ -81,8 +84,11 @@ func TestCluster_BroadcastStringAndMessageAndCheck(t *testing.T) {
node2.Router = newDummyRouter(t)

//start the cluster node 2
defer node2.Stop()
err = node2.Start()
defer func() {
err := node2.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
a.NoError(err, "No error should be raised when starting node 2 of the Cluster")

// Send a String Message
Expand Down Expand Up @@ -135,8 +141,11 @@ func TestCluster_StartShouldReturnErrorWhenNoRemotes(t *testing.T) {

node.Router = newDummyRouter(t)

defer node.Stop()
err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
if a.Error(err, "An error is expected when Starting the Cluster") {
a.Equal(err, errors.New("No remote hosts were successfully contacted when this node wanted to join the cluster"),
"Error should be precisely defined")
Expand All @@ -157,8 +166,11 @@ func TestCluster_StartShouldReturnErrorWhenInvalidRemotes(t *testing.T) {

node.Router = newDummyRouter(t)

defer node.Stop()
err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
if a.Error(err, "An error is expected when Starting the Cluster") {
expected := multierror.Append(errors.New("Failed to join 127.0.0.1: dial tcp 127.0.0.1:0: getsockopt: connection refused"))
a.Equal(err, expected, "Error should be precisely defined")
Expand All @@ -172,8 +184,11 @@ func TestCluster_StartShouldReturnErrorWhenNoMessageHandler(t *testing.T) {
node, err := New(&config)
a.NoError(err, "No error should be raised when Creating the Cluster")

defer node.Stop()
err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
if a.Error(err, "An error is expected when Starting the Cluster") {
expected := errors.New("There should be a valid Router already set-up")
a.Equal(expected, err, "Error should be precisely defined")
Expand All @@ -189,8 +204,11 @@ func TestCluster_NotifyMsgShouldSimplyReturnWhenDecodingInvalidMessage(t *testin

node.Router = newDummyRouter(t)

defer node.Stop()
err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
a.NoError(err, "No error should be raised when Starting the Cluster")

node.NotifyMsg([]byte{})
Expand All @@ -207,8 +225,11 @@ func TestCluster_broadcastClusterMessage(t *testing.T) {

node.Router = newDummyRouter(t)

defer node.Stop()
err = node.Start()
defer func() {
err := node.Stop()
a.NoError(err, "No error should be raised when Stopping the Cluster")
}()
a.NoError(err, "No error should be raised when Starting the Cluster")

err = node.broadcastClusterMessage(nil)
Expand Down
2 changes: 2 additions & 0 deletions server/cluster_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build cluster

package server

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build cluster

package server

import (
Expand Down
11 changes: 11 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"strings"

"github.com/cosminrentea/gobbler/server/apns"
"github.com/cosminrentea/gobbler/server/configstring"
"github.com/cosminrentea/gobbler/server/fcm"
"github.com/cosminrentea/gobbler/server/kafka"
"github.com/cosminrentea/gobbler/server/sms"
"github.com/cosminrentea/gobbler/server/websocket"
)
Expand Down Expand Up @@ -75,6 +77,7 @@ type (
APNS apns.Config
SMS sms.Config
WS websocket.Config
KafkaProducer kafka.Config
Cluster ClusterConfig
}
)
Expand Down Expand Up @@ -223,6 +226,9 @@ var (
Default(strconv.Itoa(runtime.NumCPU())).
Envar("GUBLE_SMS_WORKERS").
Int(),
KafkaReportingTopic:kingpin.Flag("sms-kafka-topic", "The name of the SMS-Reporting Kafka topic").
Envar("GUBLE_SMS_KAFKA_TOPIC").
String(),
IntervalMetrics: &defaultSMSMetrics,
},
WS: websocket.Config{
Expand All @@ -234,6 +240,11 @@ var (
Default("/stream/").
String(),
},
KafkaProducer: kafka.Config{
Brokers: configstring.NewFromKingpin(
kingpin.Flag("kafka-brokers", `The list Kafka brokers to which Guble should connect (formatted as host:port, separated by spaces or commas)`).
Envar("GUBLE_KAFKA_BROKERS")),
},
}
)

Expand Down
11 changes: 11 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func TestParsingOfEnvironmentVariables(t *testing.T) {
os.Setenv("GUBLE_NODE_REMOTES", "127.0.0.1:8080 127.0.0.1:20002")
defer os.Unsetenv("GUBLE_NODE_REMOTES")

os.Setenv("GUBLE_KAFKA_BROKERS", "127.0.0.1:9092 127.0.0.1:9091")
defer os.Unsetenv("GUBLE_KAFKA_BROKERS")

os.Setenv("GUBLE_SMS_KAFKA_TOPIC", "sms_reporting_topic")
defer os.Unsetenv("GUBLE_SMS_KAFKA_TOPIC")

// when we parse the arguments from environment variables
parseConfig()

Expand Down Expand Up @@ -143,6 +149,8 @@ func TestParsingArgs(t *testing.T) {
"--pg-password", "pg-password",
"--pg-dbname", "pg-dbname",
"--remotes", "127.0.0.1:8080 127.0.0.1:20002",
"--kafka-brokers", "127.0.0.1:9092 127.0.0.1:9091",
"--sms-kafka-topic", "sms_reporting_topic",
}

// when we parse the arguments from command-line flags
Expand Down Expand Up @@ -188,6 +196,9 @@ func assertArguments(a *assert.Assertions) {
a.Equal("dev", *Config.EnvName)
a.Equal("mem", *Config.Profile)

a.Equal("[ 127.0.0.1:9092 127.0.0.1:9091]", (*Config.KafkaProducer.Brokers).String())
a.Equal("sms_reporting_topic", *Config.SMS.KafkaReportingTopic)

assertClusterRemotes(a)
}

Expand Down
42 changes: 42 additions & 0 deletions server/configstring/configstringlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package configstring

import (
"strings"

"gopkg.in/alecthomas/kingpin.v2"
)

type List []string

func NewFromKingpin(settings kingpin.Settings) *List {
sl := make(List, 0)
settings.SetValue(&sl)
return &sl
}

func (sl *List) Set(value string) error {
delimiter := " "
if strings.Contains(value, ",") {
delimiter = ","
}
slice := strings.Split(value, delimiter)
for _, s := range slice {
if s != "" {
*sl = append(*sl, s)
}
}
return nil
}

func (sl *List) IsEmpty() bool {
return len(*sl) == 0
}

func (sl List) String() string {
res := "["
for _, s := range sl {
res = res + " " + s
}
res = res + "]"
return res
}
Loading

0 comments on commit dcd06e4

Please sign in to comment.