Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Merge 3deee5d into 5de5c4e
Browse files Browse the repository at this point in the history
  • Loading branch information
luan committed Sep 24, 2014
2 parents 5de5c4e + 3deee5d commit f446d5f
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 41 deletions.
7 changes: 3 additions & 4 deletions cfcomponent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ type Config struct {
NatsPort int
NatsUser string
NatsPass string
MbusClient yagnats.ApceraWrapperNATSClient
MbusClient yagnats.NATSConn
}

var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.ApceraWrapperNATSClient, err error) {
var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.NATSConn, err error) {
members := make([]string, 0)
for _, natsHost := range c.NatsHosts {
members = append(members, fmt.Sprintf("nats://%s:%s@%s:%d", c.NatsUser, c.NatsPass, natsHost, c.NatsPort))
}

natsClient = yagnats.NewApceraClientWrapper(members)
err = natsClient.Connect()
natsClient, err = yagnats.Connect(members)
if err != nil {
return nil, errors.New(fmt.Sprintf("Could not connect to NATS: %v", err.Error()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package collectorregistrar
import (
"encoding/json"
"fmt"
"time"

"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
"github.com/cloudfoundry/yagnats"
"time"
)

type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error)
type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.NATSConn, error)

type CollectorRegistrar interface {
Run(stopChan <-chan struct{})
Expand All @@ -20,7 +21,7 @@ type collectorRegistrar struct {
interval time.Duration
logger *gosteno.Logger
cfc cfcomponent.Component
client yagnats.ApceraWrapperNATSClient
client yagnats.NATSConn
config *cfcomponent.Config
}

Expand All @@ -46,7 +47,7 @@ func (registrar *collectorRegistrar) Run(stopChan <-chan struct{}) {
err := registrar.announceMessage()
if err != nil {
if registrar.client != nil {
registrar.client.Disconnect()
registrar.client.Close()
registrar.client = nil
}
registrar.logger.Warn(err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"github.com/cloudfoundry/loggregatorlib/cfcomponent/registrars/collectorregistrar"

"errors"
"sync/atomic"
"time"

"github.com/apcera/nats"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent"
Expand All @@ -12,8 +15,6 @@ import (
"github.com/cloudfoundry/yagnats/fakeyagnats"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sync/atomic"
"time"
)

var _ = Describe("Collectorregistrar", func() {
Expand All @@ -36,7 +37,7 @@ var _ = Describe("Collectorregistrar", func() {
return nil
}
fakeClientProviderCallCount = 0
fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) {
fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.NATSConn, error) {
atomic.AddInt32(&fakeClientProviderCallCount, 1)
return fakeClient, errorProvider()
}
Expand Down Expand Up @@ -90,32 +91,32 @@ var _ = Describe("Collectorregistrar", func() {
})

It("disconnects the client", func() {
Eventually(fakeClient.Disconnected).Should(BeTrue())
Eventually(fakeClient.Closed).Should(BeTrue())
})
})
})
})
})

type fakeClient struct {
*fakeyagnats.FakeApceraWrapper
disconnected bool
*fakeyagnats.FakeNATSConn
closed bool
}

func newFakeClient() *fakeClient {
return &fakeClient{
FakeApceraWrapper: fakeyagnats.NewApceraClientWrapper(),
FakeNATSConn: fakeyagnats.Connect(),
}
}

func (f *fakeClient) Disconnect() {
func (f *fakeClient) Close() {
f.Lock()
defer f.Unlock()
f.disconnected = true
f.closed = true
}

func (f *fakeClient) Disconnected() bool {
func (f *fakeClient) Closed() bool {
f.Lock()
defer f.Unlock()
return f.disconnected
return f.closed
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

type collectorRegistrar struct {
*gosteno.Logger
mBusClient yagnats.ApceraWrapperNATSClient
mBusClient yagnats.NATSConn
}

func NewCollectorRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *collectorRegistrar {
func NewCollectorRegistrar(mBusClient yagnats.NATSConn, logger *gosteno.Logger) *collectorRegistrar {
return &collectorRegistrar{mBusClient: mBusClient, Logger: logger}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestAnnounceComponent(t *testing.T) {
cfc := getTestCFComponent()
mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()

called := make(chan *nats.Msg, 10)
mbus.Subscribe(AnnounceComponentMessageSubject, func(response *nats.Msg) {
Expand All @@ -31,7 +31,7 @@ func TestAnnounceComponent(t *testing.T) {

func TestSubscribeToComponentDiscover(t *testing.T) {
cfc := getTestCFComponent()
mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()

called := make(chan *nats.Msg, 10)
mbus.Subscribe(DiscoverComponentMessageSubject, func(response *nats.Msg) {
Expand All @@ -42,7 +42,7 @@ func TestSubscribeToComponentDiscover(t *testing.T) {
registrar.subscribeToComponentDiscover(cfc)

expectedJson, _ := createYagnatsMessage(t, DiscoverComponentMessageSubject)
mbus.PublishWithReplyTo(DiscoverComponentMessageSubject, "unused-reply", expectedJson)
mbus.PublishRequest(DiscoverComponentMessageSubject, "unused-reply", expectedJson)

payloadBytes := (<-called).Data
assert.Equal(t, expectedJson, payloadBytes)
Expand Down
6 changes: 3 additions & 3 deletions cfcomponent/registrars/routerregistrar/router_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (

type registrar struct {
*gosteno.Logger
mBusClient yagnats.ApceraWrapperNATSClient
mBusClient yagnats.NATSConn
routerRegisterInterval time.Duration
lock sync.RWMutex
}

func NewRouterRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *registrar {
func NewRouterRegistrar(mBusClient yagnats.NATSConn, logger *gosteno.Logger) *registrar {
return &registrar{mBusClient: mBusClient, Logger: logger}
}

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *registrar) greetRouter() (err error) {
callback([]byte(msg.Data))
})

r.mBusClient.PublishWithReplyTo(RouterGreetMessageSubject, inbox, []byte{})
r.mBusClient.PublishRequest(RouterGreetMessageSubject, inbox, []byte{})

routerRegisterInterval := 20 * time.Second
timer := time.NewTimer(30 * time.Second)
Expand Down
16 changes: 8 additions & 8 deletions cfcomponent/registrars/routerregistrar/router_registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestGreetRouter(t *testing.T) {
routerReceivedChannel := make(chan *nats.Msg, 10)
resultChan := make(chan bool)

mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
fakeRouter(mbus, routerReceivedChannel)
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

Expand Down Expand Up @@ -50,7 +50,7 @@ func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) {
routerReceivedChannel := make(chan *nats.Msg)
resultChan := make(chan bool)

mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
fakeBrokenGreeterRouter(mbus, routerReceivedChannel)
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) {
func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) {
resultChan := make(chan bool)

mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())

go func() {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) {
}

func TestKeepRegisteringWithRouter(t *testing.T) {
mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
os.Setenv("LOG_TO_STDOUT", "false")
routerReceivedChannel := make(chan *nats.Msg)
fakeRouter(mbus, routerReceivedChannel)
Expand All @@ -132,7 +132,7 @@ func TestKeepRegisteringWithRouter(t *testing.T) {
}

func TestSubscribeToRouterStart(t *testing.T) {
mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger())
registrar.subscribeToRouterStart()

Expand Down Expand Up @@ -161,7 +161,7 @@ func TestSubscribeToRouterStart(t *testing.T) {
}

func TestUnregisterFromRouter(t *testing.T) {
mbus := fakeyagnats.NewApceraClientWrapper()
mbus := fakeyagnats.Connect()
routerReceivedChannel := make(chan *nats.Msg, 10)
fakeRouter(mbus, routerReceivedChannel)

Expand All @@ -183,7 +183,7 @@ const messageFromRouter = `{
"minimumRegisterIntervalInSeconds": 42
}`

func fakeRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) {
func fakeRouter(mbus *fakeyagnats.FakeNATSConn, returnChannel chan *nats.Msg) {
mbus.Subscribe("router.greet", func(msg *nats.Msg) {
mbus.Publish(msg.Reply, []byte(messageFromRouter))
})
Expand All @@ -208,7 +208,7 @@ func fakeRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Ms
})
}

func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) {
func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeNATSConn, returnChannel chan *nats.Msg) {

mbus.Subscribe("router.greet", func(msg *nats.Msg) {
mbus.Publish(msg.Reply, []byte("garbel garbel"))
Expand Down
4 changes: 2 additions & 2 deletions server/handlers/http_handler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package handlers

import (
"github.com/cloudfoundry/gosteno"
"mime/multipart"
"net/http"
"github.com/cloudfoundry/gosteno"
)

type httpHandler struct {
messages <-chan []byte
logger *gosteno.Logger
logger *gosteno.Logger
}

func NewHttpHandler(m <-chan []byte, logger *gosteno.Logger) *httpHandler {
Expand Down
2 changes: 1 addition & 1 deletion server/handlers/http_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers_test

import (
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/loggregatorlib/server/handlers"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -9,7 +10,6 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
)

var _ = Describe("HttpHandler", func() {
Expand Down
4 changes: 2 additions & 2 deletions server/handlers/websocket_handler.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package handlers

import (
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/server"
"github.com/gorilla/websocket"
"net/http"
"time"
"github.com/cloudfoundry/gosteno"
)

type websocketHandler struct {
messages <-chan []byte
keepAlive time.Duration
logger *gosteno.Logger
logger *gosteno.Logger
}

func NewWebsocketHandler(m <-chan []byte, keepAlive time.Duration, logger *gosteno.Logger) *websocketHandler {
Expand Down
2 changes: 1 addition & 1 deletion server/handlers/websocket_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package handlers_test

import (
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
"github.com/cloudfoundry/loggregatorlib/server/handlers"
"github.com/gorilla/websocket"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"net/http"
"net/http/httptest"
"time"
"github.com/cloudfoundry/loggregatorlib/loggertesthelper"
)

var _ = Describe("WebsocketHandler", func() {
Expand Down

0 comments on commit f446d5f

Please sign in to comment.