From 3deee5dcb6704653e78d5f9fa15ee5db1c33dd71 Mon Sep 17 00:00:00 2001 From: Julian Friedman and Luan Santos Date: Wed, 24 Sep 2014 14:59:28 -0700 Subject: [PATCH] Use thinner, new yagnats interface --- cfcomponent/config.go | 7 +++--- .../collectorregistrar/collector_registrar.go | 9 ++++---- .../collectorregistrar_test.go | 23 ++++++++++--------- .../collector_registrar.go | 4 ++-- .../collector_registrar_test.go | 6 ++--- .../routerregistrar/router_registrar.go | 6 ++--- .../routerregistrar/router_registrar_test.go | 16 ++++++------- server/handlers/http_handler.go | 4 ++-- server/handlers/http_handler_test.go | 2 +- server/handlers/websocket_handler.go | 4 ++-- server/handlers/websocket_handler_test.go | 2 +- 11 files changed, 42 insertions(+), 41 deletions(-) diff --git a/cfcomponent/config.go b/cfcomponent/config.go index 7b618c6..16d962d 100644 --- a/cfcomponent/config.go +++ b/cfcomponent/config.go @@ -17,17 +17,16 @@ type Config struct { NatsPort int NatsUser string NatsPass string - MbusClient yagnats.ApceraWrapperNATSClient + MbusClient yagnats.NATSConn } -var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.ApceraWrapperNATSClient, err error) { +var DefaultYagnatsClientProvider = func(logger *gosteno.Logger, c *Config) (natsClient yagnats.NATSConn, err error) { members := make([]string, 0) for _, natsHost := range c.NatsHosts { members = append(members, fmt.Sprintf("nats://%s:%s@%s:%d", c.NatsUser, c.NatsPass, natsHost, c.NatsPort)) } - natsClient = yagnats.NewApceraClientWrapper(members) - err = natsClient.Connect() + natsClient, err = yagnats.Connect(members) if err != nil { return nil, errors.New(fmt.Sprintf("Could not connect to NATS: %v", err.Error())) } diff --git a/cfcomponent/registrars/collectorregistrar/collector_registrar.go b/cfcomponent/registrars/collectorregistrar/collector_registrar.go index 3b805a8..dda9c67 100644 --- a/cfcomponent/registrars/collectorregistrar/collector_registrar.go +++ b/cfcomponent/registrars/collectorregistrar/collector_registrar.go @@ -3,13 +3,14 @@ package collectorregistrar import ( "encoding/json" "fmt" + "time" + "github.com/cloudfoundry/gosteno" "github.com/cloudfoundry/loggregatorlib/cfcomponent" "github.com/cloudfoundry/yagnats" - "time" ) -type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) +type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.NATSConn, error) type CollectorRegistrar interface { Run(stopChan <-chan struct{}) @@ -20,7 +21,7 @@ type collectorRegistrar struct { interval time.Duration logger *gosteno.Logger cfc cfcomponent.Component - client yagnats.ApceraWrapperNATSClient + client yagnats.NATSConn config *cfcomponent.Config } @@ -46,7 +47,7 @@ func (registrar *collectorRegistrar) Run(stopChan <-chan struct{}) { err := registrar.announceMessage() if err != nil { if registrar.client != nil { - registrar.client.Disconnect() + registrar.client.Close() registrar.client = nil } registrar.logger.Warn(err.Error()) diff --git a/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go b/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go index 02657ec..9f30156 100644 --- a/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go +++ b/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go @@ -4,6 +4,9 @@ import ( "github.com/cloudfoundry/loggregatorlib/cfcomponent/registrars/collectorregistrar" "errors" + "sync/atomic" + "time" + "github.com/apcera/nats" "github.com/cloudfoundry/gosteno" "github.com/cloudfoundry/loggregatorlib/cfcomponent" @@ -12,8 +15,6 @@ import ( "github.com/cloudfoundry/yagnats/fakeyagnats" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "sync/atomic" - "time" ) var _ = Describe("Collectorregistrar", func() { @@ -36,7 +37,7 @@ var _ = Describe("Collectorregistrar", func() { return nil } fakeClientProviderCallCount = 0 - fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) { + fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.NATSConn, error) { atomic.AddInt32(&fakeClientProviderCallCount, 1) return fakeClient, errorProvider() } @@ -90,7 +91,7 @@ var _ = Describe("Collectorregistrar", func() { }) It("disconnects the client", func() { - Eventually(fakeClient.Disconnected).Should(BeTrue()) + Eventually(fakeClient.Closed).Should(BeTrue()) }) }) }) @@ -98,24 +99,24 @@ var _ = Describe("Collectorregistrar", func() { }) type fakeClient struct { - *fakeyagnats.FakeApceraWrapper - disconnected bool + *fakeyagnats.FakeNATSConn + closed bool } func newFakeClient() *fakeClient { return &fakeClient{ - FakeApceraWrapper: fakeyagnats.NewApceraClientWrapper(), + FakeNATSConn: fakeyagnats.Connect(), } } -func (f *fakeClient) Disconnect() { +func (f *fakeClient) Close() { f.Lock() defer f.Unlock() - f.disconnected = true + f.closed = true } -func (f *fakeClient) Disconnected() bool { +func (f *fakeClient) Closed() bool { f.Lock() defer f.Unlock() - return f.disconnected + return f.closed } diff --git a/cfcomponent/registrars/legacycollectorregistrar/collector_registrar.go b/cfcomponent/registrars/legacycollectorregistrar/collector_registrar.go index 1a4ab88..f35af44 100644 --- a/cfcomponent/registrars/legacycollectorregistrar/collector_registrar.go +++ b/cfcomponent/registrars/legacycollectorregistrar/collector_registrar.go @@ -11,10 +11,10 @@ import ( type collectorRegistrar struct { *gosteno.Logger - mBusClient yagnats.ApceraWrapperNATSClient + mBusClient yagnats.NATSConn } -func NewCollectorRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *collectorRegistrar { +func NewCollectorRegistrar(mBusClient yagnats.NATSConn, logger *gosteno.Logger) *collectorRegistrar { return &collectorRegistrar{mBusClient: mBusClient, Logger: logger} } diff --git a/cfcomponent/registrars/legacycollectorregistrar/collector_registrar_test.go b/cfcomponent/registrars/legacycollectorregistrar/collector_registrar_test.go index 16d6d91..a1d32f8 100644 --- a/cfcomponent/registrars/legacycollectorregistrar/collector_registrar_test.go +++ b/cfcomponent/registrars/legacycollectorregistrar/collector_registrar_test.go @@ -13,7 +13,7 @@ import ( func TestAnnounceComponent(t *testing.T) { cfc := getTestCFComponent() - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() called := make(chan *nats.Msg, 10) mbus.Subscribe(AnnounceComponentMessageSubject, func(response *nats.Msg) { @@ -31,7 +31,7 @@ func TestAnnounceComponent(t *testing.T) { func TestSubscribeToComponentDiscover(t *testing.T) { cfc := getTestCFComponent() - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() called := make(chan *nats.Msg, 10) mbus.Subscribe(DiscoverComponentMessageSubject, func(response *nats.Msg) { @@ -42,7 +42,7 @@ func TestSubscribeToComponentDiscover(t *testing.T) { registrar.subscribeToComponentDiscover(cfc) expectedJson, _ := createYagnatsMessage(t, DiscoverComponentMessageSubject) - mbus.PublishWithReplyTo(DiscoverComponentMessageSubject, "unused-reply", expectedJson) + mbus.PublishRequest(DiscoverComponentMessageSubject, "unused-reply", expectedJson) payloadBytes := (<-called).Data assert.Equal(t, expectedJson, payloadBytes) diff --git a/cfcomponent/registrars/routerregistrar/router_registrar.go b/cfcomponent/registrars/routerregistrar/router_registrar.go index 7c7719f..8a0d1d5 100644 --- a/cfcomponent/registrars/routerregistrar/router_registrar.go +++ b/cfcomponent/registrars/routerregistrar/router_registrar.go @@ -15,12 +15,12 @@ import ( type registrar struct { *gosteno.Logger - mBusClient yagnats.ApceraWrapperNATSClient + mBusClient yagnats.NATSConn routerRegisterInterval time.Duration lock sync.RWMutex } -func NewRouterRegistrar(mBusClient yagnats.ApceraWrapperNATSClient, logger *gosteno.Logger) *registrar { +func NewRouterRegistrar(mBusClient yagnats.NATSConn, logger *gosteno.Logger) *registrar { return ®istrar{mBusClient: mBusClient, Logger: logger} } @@ -60,7 +60,7 @@ func (r *registrar) greetRouter() (err error) { callback([]byte(msg.Data)) }) - r.mBusClient.PublishWithReplyTo(RouterGreetMessageSubject, inbox, []byte{}) + r.mBusClient.PublishRequest(RouterGreetMessageSubject, inbox, []byte{}) routerRegisterInterval := 20 * time.Second timer := time.NewTimer(30 * time.Second) diff --git a/cfcomponent/registrars/routerregistrar/router_registrar_test.go b/cfcomponent/registrars/routerregistrar/router_registrar_test.go index 7abc514..ccf1478 100644 --- a/cfcomponent/registrars/routerregistrar/router_registrar_test.go +++ b/cfcomponent/registrars/routerregistrar/router_registrar_test.go @@ -15,7 +15,7 @@ func TestGreetRouter(t *testing.T) { routerReceivedChannel := make(chan *nats.Msg, 10) resultChan := make(chan bool) - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() fakeRouter(mbus, routerReceivedChannel) registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger()) @@ -50,7 +50,7 @@ func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) { routerReceivedChannel := make(chan *nats.Msg) resultChan := make(chan bool) - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() fakeBrokenGreeterRouter(mbus, routerReceivedChannel) registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger()) @@ -82,7 +82,7 @@ func TestDefaultIntervalIsSetWhenGreetRouterFails(t *testing.T) { func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) { resultChan := make(chan bool) - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger()) go func() { @@ -111,7 +111,7 @@ func TestDefaultIntervalIsSetWhenGreetWithoutRouter(t *testing.T) { } func TestKeepRegisteringWithRouter(t *testing.T) { - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() os.Setenv("LOG_TO_STDOUT", "false") routerReceivedChannel := make(chan *nats.Msg) fakeRouter(mbus, routerReceivedChannel) @@ -132,7 +132,7 @@ func TestKeepRegisteringWithRouter(t *testing.T) { } func TestSubscribeToRouterStart(t *testing.T) { - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() registrar := NewRouterRegistrar(mbus, loggertesthelper.Logger()) registrar.subscribeToRouterStart() @@ -161,7 +161,7 @@ func TestSubscribeToRouterStart(t *testing.T) { } func TestUnregisterFromRouter(t *testing.T) { - mbus := fakeyagnats.NewApceraClientWrapper() + mbus := fakeyagnats.Connect() routerReceivedChannel := make(chan *nats.Msg, 10) fakeRouter(mbus, routerReceivedChannel) @@ -183,7 +183,7 @@ const messageFromRouter = `{ "minimumRegisterIntervalInSeconds": 42 }` -func fakeRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) { +func fakeRouter(mbus *fakeyagnats.FakeNATSConn, returnChannel chan *nats.Msg) { mbus.Subscribe("router.greet", func(msg *nats.Msg) { mbus.Publish(msg.Reply, []byte(messageFromRouter)) }) @@ -208,7 +208,7 @@ func fakeRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Ms }) } -func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeApceraWrapper, returnChannel chan *nats.Msg) { +func fakeBrokenGreeterRouter(mbus *fakeyagnats.FakeNATSConn, returnChannel chan *nats.Msg) { mbus.Subscribe("router.greet", func(msg *nats.Msg) { mbus.Publish(msg.Reply, []byte("garbel garbel")) diff --git a/server/handlers/http_handler.go b/server/handlers/http_handler.go index 5097c08..be67907 100644 --- a/server/handlers/http_handler.go +++ b/server/handlers/http_handler.go @@ -1,14 +1,14 @@ package handlers import ( + "github.com/cloudfoundry/gosteno" "mime/multipart" "net/http" - "github.com/cloudfoundry/gosteno" ) type httpHandler struct { messages <-chan []byte - logger *gosteno.Logger + logger *gosteno.Logger } func NewHttpHandler(m <-chan []byte, logger *gosteno.Logger) *httpHandler { diff --git a/server/handlers/http_handler_test.go b/server/handlers/http_handler_test.go index f06701b..c243bce 100644 --- a/server/handlers/http_handler_test.go +++ b/server/handlers/http_handler_test.go @@ -1,6 +1,7 @@ package handlers_test import ( + "github.com/cloudfoundry/loggregatorlib/loggertesthelper" "github.com/cloudfoundry/loggregatorlib/server/handlers" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -9,7 +10,6 @@ import ( "net/http" "net/http/httptest" "regexp" - "github.com/cloudfoundry/loggregatorlib/loggertesthelper" ) var _ = Describe("HttpHandler", func() { diff --git a/server/handlers/websocket_handler.go b/server/handlers/websocket_handler.go index 5485e82..aab5923 100644 --- a/server/handlers/websocket_handler.go +++ b/server/handlers/websocket_handler.go @@ -1,17 +1,17 @@ package handlers import ( + "github.com/cloudfoundry/gosteno" "github.com/cloudfoundry/loggregatorlib/server" "github.com/gorilla/websocket" "net/http" "time" - "github.com/cloudfoundry/gosteno" ) type websocketHandler struct { messages <-chan []byte keepAlive time.Duration - logger *gosteno.Logger + logger *gosteno.Logger } func NewWebsocketHandler(m <-chan []byte, keepAlive time.Duration, logger *gosteno.Logger) *websocketHandler { diff --git a/server/handlers/websocket_handler_test.go b/server/handlers/websocket_handler_test.go index 26eb88d..06aaf75 100644 --- a/server/handlers/websocket_handler_test.go +++ b/server/handlers/websocket_handler_test.go @@ -1,6 +1,7 @@ package handlers_test import ( + "github.com/cloudfoundry/loggregatorlib/loggertesthelper" "github.com/cloudfoundry/loggregatorlib/server/handlers" "github.com/gorilla/websocket" . "github.com/onsi/ginkgo" @@ -8,7 +9,6 @@ import ( "net/http" "net/http/httptest" "time" - "github.com/cloudfoundry/loggregatorlib/loggertesthelper" ) var _ = Describe("WebsocketHandler", func() {