diff --git a/cfcomponent/registrars/collectorregistrar/collector_messages.go b/cfcomponent/registrars/collectorregistrar/collector_messages.go new file mode 100644 index 0000000..f45caaf --- /dev/null +++ b/cfcomponent/registrars/collectorregistrar/collector_messages.go @@ -0,0 +1,29 @@ +package collectorregistrar + +import ( + "fmt" + "github.com/cloudfoundry/loggregatorlib/cfcomponent" +) + +const AnnounceComponentMessageSubject = "vcap.component.announce" +const DiscoverComponentMessageSubject = "vcap.component.discover" + +type AnnounceComponentMessage struct { + Type string `json:"type"` + Index uint `json:"index"` + Host string `json:"host"` + UUID string `json:"uuid"` + Credentials []string `json:"credentials"` +} + +func NewAnnounceComponentMessage(cfc cfcomponent.Component) (message *AnnounceComponentMessage) { + message = &AnnounceComponentMessage{ + Type: cfc.Type, + Index: cfc.Index, + Host: fmt.Sprintf("%s:%d", cfc.IpAddress, cfc.StatusPort), + UUID: fmt.Sprintf("%d-%s", cfc.Index, cfc.UUID), + Credentials: cfc.StatusCredentials, + } + + return message +} diff --git a/cfcomponent/registrars/collectorregistrar/collector_registrar.go b/cfcomponent/registrars/collectorregistrar/collector_registrar.go new file mode 100644 index 0000000..3b805a8 --- /dev/null +++ b/cfcomponent/registrars/collectorregistrar/collector_registrar.go @@ -0,0 +1,80 @@ +package collectorregistrar + +import ( + "encoding/json" + "fmt" + "github.com/cloudfoundry/gosteno" + "github.com/cloudfoundry/loggregatorlib/cfcomponent" + "github.com/cloudfoundry/yagnats" + "time" +) + +type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) + +type CollectorRegistrar interface { + Run(stopChan <-chan struct{}) +} + +type collectorRegistrar struct { + clientProvider ClientProvider + interval time.Duration + logger *gosteno.Logger + cfc cfcomponent.Component + client yagnats.ApceraWrapperNATSClient + config *cfcomponent.Config +} + +func NewCollectorRegistrar(clientProvider ClientProvider, cfc cfcomponent.Component, interval time.Duration, config *cfcomponent.Config) CollectorRegistrar { + return &collectorRegistrar{ + clientProvider: clientProvider, + logger: cfc.Logger, + cfc: cfc, + interval: interval, + config: config, + } +} + +func (registrar *collectorRegistrar) Run(stopChan <-chan struct{}) { + ticker := time.NewTicker(registrar.interval) + defer ticker.Stop() + + for { + select { + case <-stopChan: + return + case <-ticker.C: + err := registrar.announceMessage() + if err != nil { + if registrar.client != nil { + registrar.client.Disconnect() + registrar.client = nil + } + registrar.logger.Warn(err.Error()) + } + } + } +} + +func (registrar *collectorRegistrar) announceMessage() error { + if registrar.client == nil { + registrar.logger.Debugf("creating NATS client") + + var err error + registrar.client, err = registrar.clientProvider(registrar.logger, registrar.config) + if err != nil { + return fmt.Errorf("Failed to create client: %s", err) + } + } + + json, err := json.Marshal(NewAnnounceComponentMessage(registrar.cfc)) + if err != nil { + return fmt.Errorf("Failed to marshal component message: %s", err) + } + + err = registrar.client.Publish(AnnounceComponentMessageSubject, json) + if err != nil { + return fmt.Errorf("Failed to publish: %s", err) + } + + return nil +} diff --git a/cfcomponent/registrars/collectorregistrar/collectorregistrar_suite_test.go b/cfcomponent/registrars/collectorregistrar/collectorregistrar_suite_test.go new file mode 100644 index 0000000..8d0afd7 --- /dev/null +++ b/cfcomponent/registrars/collectorregistrar/collectorregistrar_suite_test.go @@ -0,0 +1,13 @@ +package collectorregistrar_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestCollectorregistrar(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Collectorregistrar Suite") +} diff --git a/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go b/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go new file mode 100644 index 0000000..02657ec --- /dev/null +++ b/cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go @@ -0,0 +1,121 @@ +package collectorregistrar_test + +import ( + "github.com/cloudfoundry/loggregatorlib/cfcomponent/registrars/collectorregistrar" + + "errors" + "github.com/apcera/nats" + "github.com/cloudfoundry/gosteno" + "github.com/cloudfoundry/loggregatorlib/cfcomponent" + "github.com/cloudfoundry/loggregatorlib/loggertesthelper" + "github.com/cloudfoundry/yagnats" + "github.com/cloudfoundry/yagnats/fakeyagnats" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sync/atomic" + "time" +) + +var _ = Describe("Collectorregistrar", func() { + Describe("Run", func() { + var ( + fakeClient *fakeClient + component cfcomponent.Component + registrar collectorregistrar.CollectorRegistrar + stopChan chan struct{} + doneChan chan struct{} + errorProvider func() error + fakeClientProviderCallCount int32 + ) + + BeforeEach(func() { + fakeClient = newFakeClient() + component, _ = cfcomponent.NewComponent(loggertesthelper.Logger(), "compType", 3, nil, 9999, []string{"username", "password"}, nil) + component.UUID = "OurUUID" + errorProvider = func() error { + return nil + } + fakeClientProviderCallCount = 0 + fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) { + atomic.AddInt32(&fakeClientProviderCallCount, 1) + return fakeClient, errorProvider() + } + registrar = collectorregistrar.NewCollectorRegistrar(fakeClientProvider, component, 10*time.Millisecond, nil) + stopChan = make(chan struct{}) + doneChan = make(chan struct{}) + + go func() { + defer close(doneChan) + registrar.Run(stopChan) + }() + }) + + AfterEach(func() { + close(stopChan) + Eventually(doneChan).Should(BeClosed()) + }) + + Context("with no errors", func() { + It("periodically announces itself via NATS", func() { + var messages []*nats.Msg + Eventually(func() int { + messages = fakeClient.PublishedMessages(collectorregistrar.AnnounceComponentMessageSubject) + return len(messages) + }).Should(BeNumerically(">", 1)) + + for _, message := range messages { + Expect(message.Data).To(MatchRegexp(`^\{"type":"compType","index":3,"host":"[^:]*:9999","uuid":"3-OurUUID","credentials":\["username","password"\]\}$`)) + } + }) + + It("reuses the client connection", func() { + Eventually(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1)) + Consistently(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1)) + }) + }) + + Context("with errors", func() { + Context("from the client provider", func() { + BeforeEach(func() { + fakeError := errors.New("fake error") + errorProvider = func() error { + returnedError := fakeError + fakeError = nil + return returnedError + } + }) + + It("recovers when the client provider recovers", func() { + Eventually(fakeClient.PublishedMessageCount).Should(BeNumerically(">", 0)) + }) + + It("disconnects the client", func() { + Eventually(fakeClient.Disconnected).Should(BeTrue()) + }) + }) + }) + }) +}) + +type fakeClient struct { + *fakeyagnats.FakeApceraWrapper + disconnected bool +} + +func newFakeClient() *fakeClient { + return &fakeClient{ + FakeApceraWrapper: fakeyagnats.NewApceraClientWrapper(), + } +} + +func (f *fakeClient) Disconnect() { + f.Lock() + defer f.Unlock() + f.disconnected = true +} + +func (f *fakeClient) Disconnected() bool { + f.Lock() + defer f.Unlock() + return f.disconnected +}