Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request #6 from cloudfoundry/use-new-yagnats-wrappers
Browse files Browse the repository at this point in the history
Support new yagnats
  • Loading branch information
poy committed Sep 17, 2014
2 parents 3c4a0b0 + bc3c7c8 commit 1b33526
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 78 deletions.
19 changes: 6 additions & 13 deletions cfcomponent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,17 @@ type Config struct {
NatsPort int
NatsUser string
NatsPass string
MbusClient yagnats.NATSClient
MbusClient yagnats.ApceraWrapperNATSClient
}

var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.NATSClient, err error) {
members := []yagnats.ConnectionProvider{}
var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.ApceraWrapperNATSClient, err error) {
members := make([]string, 0)
for _, natsHost := range c.NatsHosts {
members = append(members, &yagnats.ConnectionInfo{
Addr: fmt.Sprintf("%s:%d", natsHost, c.NatsPort),
Username: c.NatsUser,
Password: c.NatsPass,
})
members = append(members, fmt.Sprintf("nats://%s:%s@%s:%d", c.NatsUser, c.NatsPass, natsHost, c.NatsPort))
}

connectionInfo := &yagnats.ConnectionCluster{
Members: members,
}
natsClient = yagnats.NewClient()
err = natsClient.Connect(connectionInfo)
natsClient = yagnats.NewApceraClientWrapper(members)
err = natsClient.Connect()
if err != nil {
return nil, errors.New(fmt.Sprintf("Could not connect to NATS: %v", err.Error()))
}
Expand Down
12 changes: 7 additions & 5 deletions cfcomponent/registrars/collectorregistrar/collector_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package collectorregistrar

import (
"encoding/json"

"github.com/apcera/nats"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/yagnats"
)

type collectorRegistrar struct {
*gosteno.Logger
mBusClient yagnats.NATSClient
mBusClient yagnats.ApceraWrapperNATSClient
}

func NewCollectorRegistrar(mBusClient yagnats.NATSClient, logger *gosteno.Logger) *collectorRegistrar {
func NewCollectorRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *collectorRegistrar {
return &collectorRegistrar{mBusClient: mBusClient, Logger: logger}
}

Expand All @@ -34,13 +36,13 @@ func (r collectorRegistrar) announceComponent(cfc cfcomponent.Component) error {
}

func (r collectorRegistrar) subscribeToComponentDiscover(cfc cfcomponent.Component) {
var callback yagnats.Callback
callback = func(msg *yagnats.Message) {
var callback nats.MsgHandler
callback = func(msg *nats.Msg) {
json, err := json.Marshal(NewAnnounceComponentMessage(cfc))
if err != nil {
r.Warnf("Failed to marshal response to message [%s]: %s", DiscoverComponentMessageSubject, err.Error())
}
r.mBusClient.Publish(msg.ReplyTo, json)
r.mBusClient.Publish(msg.Reply, json)
}

r.mBusClient.Subscribe(DiscoverComponentMessageSubject, callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package collectorregistrar

import (
"encoding/json"
"testing"

"github.com/apcera/nats"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/yagnats"
"github.com/cloudfoundry/yagnats/fakeyagnats"
"github.com/stretchr/testify/assert"
"testing"
)

func TestAnnounceComponent(t *testing.T) {
cfc := getTestCFComponent()
mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()

called := make(chan *yagnats.Message, 10)
mbus.Subscribe(AnnounceComponentMessageSubject, func(response *yagnats.Message) {
called := make(chan *nats.Msg, 10)
mbus.Subscribe(AnnounceComponentMessageSubject, func(response *nats.Msg) {
called <- response
})

Expand All @@ -24,16 +25,16 @@ func TestAnnounceComponent(t *testing.T) {

expectedJson, _ := createYagnatsMessage(t, AnnounceComponentMessageSubject)

payloadBytes := (<-called).Payload
payloadBytes := (<-called).Data
assert.Equal(t, expectedJson, payloadBytes)
}

func TestSubscribeToComponentDiscover(t *testing.T) {
cfc := getTestCFComponent()
mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()

called := make(chan *yagnats.Message, 10)
mbus.Subscribe(DiscoverComponentMessageSubject, func(response *yagnats.Message) {
called := make(chan *nats.Msg, 10)
mbus.Subscribe(DiscoverComponentMessageSubject, func(response *nats.Msg) {
called <- response
})

Expand All @@ -43,11 +44,11 @@ func TestSubscribeToComponentDiscover(t *testing.T) {
expectedJson, _ := createYagnatsMessage(t, DiscoverComponentMessageSubject)
mbus.PublishWithReplyTo(DiscoverComponentMessageSubject, "unused-reply", expectedJson)

payloadBytes := (<-called).Payload
payloadBytes := (<-called).Data
assert.Equal(t, expectedJson, payloadBytes)
}

func createYagnatsMessage(t *testing.T, subject string) ([]byte, *yagnats.Message) {
func createYagnatsMessage(t *testing.T, subject string) ([]byte, *nats.Msg) {

expected := &AnnounceComponentMessage{
Type: "Loggregator Server",
Expand All @@ -60,10 +61,10 @@ func createYagnatsMessage(t *testing.T, subject string) ([]byte, *yagnats.Messag
expectedJson, err := json.Marshal(expected)
assert.NoError(t, err)

yagnatsMsg := &yagnats.Message{
yagnatsMsg := &nats.Msg{
Subject: subject,
ReplyTo: "reply_to",
Payload: expectedJson,
Reply: "reply_to",
Data: expectedJson,
}

return expectedJson, yagnatsMsg
Expand Down
18 changes: 10 additions & 8 deletions cfcomponent/registrars/routerregistrar/router_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/apcera/nats"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/yagnats"
uuid "github.com/nu7hatch/gouuid"
"sync"
"time"
)

type registrar struct {
*gosteno.Logger
mBusClient yagnats.NATSClient
mBusClient yagnats.ApceraWrapperNATSClient
routerRegisterInterval time.Duration
lock sync.RWMutex
}

func NewRouterRegistrar(mBusClient yagnats.NATSClient, logger *gosteno.Logger) *registrar {
func NewRouterRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *registrar {
return &registrar{mBusClient: mBusClient, Logger: logger}
}

Expand Down Expand Up @@ -54,8 +56,8 @@ func (r *registrar) greetRouter() (err error) {
return err
}

r.mBusClient.Subscribe(inbox, func(msg *yagnats.Message) {
callback([]byte(msg.Payload))
r.mBusClient.Subscribe(inbox, func(msg *nats.Msg) {
callback([]byte(msg.Data))
})

r.mBusClient.PublishWithReplyTo(RouterGreetMessageSubject, inbox, []byte{})
Expand Down Expand Up @@ -85,8 +87,8 @@ func (r *registrar) greetRouter() (err error) {
}

func (r *registrar) subscribeToRouterStart() {
r.mBusClient.Subscribe(RouterStartMessageSubject, func(msg *yagnats.Message) {
payload := msg.Payload
r.mBusClient.Subscribe(RouterStartMessageSubject, func(msg *nats.Msg) {
payload := msg.Data
routerResponse := &RouterResponse{}
err := json.Unmarshal(payload, routerResponse)
if err != nil {
Expand Down
65 changes: 33 additions & 32 deletions cfcomponent/registrars/routerregistrar/router_registrar_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package routerregistrar

import (
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/yagnats"
"github.com/cloudfoundry/yagnats/fakeyagnats"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"

"github.com/apcera/nats"
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/yagnats/fakeyagnats"
"github.com/stretchr/testify/assert"
)

func TestGreetRouter(t *testing.T) {
routerReceivedChannel := make(chan *yagnats.Message, 10)
routerReceivedChannel := make(chan *nats.Msg, 10)
resultChan := make(chan bool)

mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()
fakeRouter(mbus, routerReceivedChannel)
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

Expand Down Expand Up @@ -46,10 +47,10 @@ func TestGreetRouter(t *testing.T) {
}

func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) {
routerReceivedChannel := make(chan *yagnats.Message)
routerReceivedChannel := make(chan *nats.Msg)
resultChan := make(chan bool)

mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()
fakeBrokenGreeterRouter(mbus, routerReceivedChannel)
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

Expand Down Expand Up @@ -81,7 +82,7 @@ func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) {
func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) {
resultChan := make(chan bool)

mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

go func() {
Expand Down Expand Up @@ -110,9 +111,9 @@ func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) {
}

func TestKeepRegisteringWithRouter(t *testing.T) {
mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()
os.Setenv("LOG_TO_STDOUT", "false")
routerReceivedChannel := make(chan *yagnats.Message)
routerReceivedChannel := make(chan *nats.Msg)
fakeRouter(mbus, routerReceivedChannel)

registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())
Expand All @@ -123,15 +124,15 @@ func TestKeepRegisteringWithRouter(t *testing.T) {
time.Sleep(55 * time.Millisecond)
select {
case msg := <-routerReceivedChannel:
assert.Equal(t, `registering:{"host":"13.12.14.15","port":8083,"uris":["foobar.vcap.me"]}`, string(msg.Payload))
assert.Equal(t, `registering:{"host":"13.12.14.15","port":8083,"uris":["foobar.vcap.me"]}`, string(msg.Data))
default:
t.Error("Router did not receive a router.register in time!")
}
}
}

func TestSubscribeToRouterStart(t *testing.T) {
mbus := fakeyagnats.New()
mbus := fakeyagnats.NewApceraClientWrapper()
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())
registrar.subscribeToRouterStart()

Expand Down Expand Up @@ -160,8 +161,8 @@ func TestSubscribeToRouterStart(t *testing.T) {
}

func TestUnregisterFromRouter(t *testing.T) {
mbus := fakeyagnats.New()
routerReceivedChannel := make(chan *yagnats.Message, 10)
mbus := fakeyagnats.NewApceraClientWrapper()
routerReceivedChannel := make(chan *nats.Msg, 10)
fakeRouter(mbus, routerReceivedChannel)

registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())
Expand All @@ -170,7 +171,7 @@ func TestUnregisterFromRouter(t *testing.T) {
select {
case msg := <-routerReceivedChannel:
host := "13.12.14.15"
assert.Equal(t, `unregistering:{"host":"`+host+`","port":8083,"uris":["foobar.vcap.me"]}`, string(msg.Payload))
assert.Equal(t, `unregistering:{"host":"`+host+`","port":8083,"uris":["foobar.vcap.me"]}`, string(msg.Data))
case <-time.After(2 * time.Second):
t.Error("Router did not receive a router.unregister in time!")
}
Expand All @@ -182,34 +183,34 @@ const messageFromRouter = `{
"minimumRegisterIntervalInSeconds": 42
}`

func fakeRouter(mbus *fakeyagnats.FakeYagnats, returnChannel chan *yagnats.Message) {
mbus.Subscribe("router.greet", func(msg *yagnats.Message) {
mbus.Publish(msg.ReplyTo, []byte(messageFromRouter))
func fakeRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) {
mbus.Subscribe("router.greet", func(msg *nats.Msg) {
mbus.Publish(msg.Reply, []byte(messageFromRouter))
})

mbus.Subscribe("router.register", func(msg *yagnats.Message) {
returnChannel <- &yagnats.Message{
mbus.Subscribe("router.register", func(msg *nats.Msg) {
returnChannel <- &nats.Msg{
Subject: msg.Subject,
ReplyTo: msg.ReplyTo,
Payload: []byte("registering:" + string(msg.Payload)),
Reply: msg.Reply,
Data: []byte("registering:" + string(msg.Data)),
}

mbus.Publish(msg.ReplyTo, msg.Payload)
mbus.Publish(msg.Reply, msg.Data)
})

mbus.Subscribe("router.unregister", func(msg *yagnats.Message) {
returnChannel <- &yagnats.Message{
mbus.Subscribe("router.unregister", func(msg *nats.Msg) {
returnChannel <- &nats.Msg{
Subject: msg.Subject,
ReplyTo: msg.ReplyTo,
Payload: []byte("unregistering:" + string(msg.Payload)),
Reply: msg.Reply,
Data: []byte("unregistering:" + string(msg.Data)),
}
mbus.Publish(msg.ReplyTo, msg.Payload)
mbus.Publish(msg.Reply, msg.Data)
})
}

func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeYagnats, returnChannel chan *yagnats.Message) {
func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) {

mbus.Subscribe("router.greet", func(msg *yagnats.Message) {
mbus.Publish(msg.ReplyTo, []byte("garbel garbel"))
mbus.Subscribe("router.greet", func(msg *nats.Msg) {
mbus.Publish(msg.Reply, []byte("garbel garbel"))
})
}
12 changes: 6 additions & 6 deletions clientpool/loggregator_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import (
var ErrorEmptyClientPool = errors.New("loggregator client pool is empty")

type LoggregatorClientPool struct {
clients map[string]loggregatorclient.LoggregatorClient
logger *gosteno.Logger
loggregatorPort int
clients map[string]loggregatorclient.LoggregatorClient
logger *gosteno.Logger
loggregatorPort int
sync.RWMutex
}

func NewLoggregatorClientPool(logger *gosteno.Logger, port int) *LoggregatorClientPool {
return &LoggregatorClientPool{
loggregatorPort: port,
clients: make(map[string]loggregatorclient.LoggregatorClient),
logger: logger,
loggregatorPort: port,
clients: make(map[string]loggregatorclient.LoggregatorClient),
logger: logger,
}
}

Expand Down

0 comments on commit 1b33526

Please sign in to comment.