This repository has been archived by the owner on Jan 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add new implementation of collector registrar
- send announce message on a periodic schedule - do not subscribe to discover message [#79030122] Signed-off-by: Johannes Petzold <jpetzold@pivotal.io>
- Loading branch information
Andrew Poydence
committed
Sep 24, 2014
1 parent
888b219
commit 5de5c4e
Showing
4 changed files
with
243 additions
and
0 deletions.
There are no files selected for viewing
29 changes: 29 additions & 0 deletions
29
cfcomponent/registrars/collectorregistrar/collector_messages.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package collectorregistrar | ||
|
||
import ( | ||
"fmt" | ||
"github.com/cloudfoundry/loggregatorlib/cfcomponent" | ||
) | ||
|
||
const AnnounceComponentMessageSubject = "vcap.component.announce" | ||
const DiscoverComponentMessageSubject = "vcap.component.discover" | ||
|
||
type AnnounceComponentMessage struct { | ||
Type string `json:"type"` | ||
Index uint `json:"index"` | ||
Host string `json:"host"` | ||
UUID string `json:"uuid"` | ||
Credentials []string `json:"credentials"` | ||
} | ||
|
||
func NewAnnounceComponentMessage(cfc cfcomponent.Component) (message *AnnounceComponentMessage) { | ||
message = &AnnounceComponentMessage{ | ||
Type: cfc.Type, | ||
Index: cfc.Index, | ||
Host: fmt.Sprintf("%s:%d", cfc.IpAddress, cfc.StatusPort), | ||
UUID: fmt.Sprintf("%d-%s", cfc.Index, cfc.UUID), | ||
Credentials: cfc.StatusCredentials, | ||
} | ||
|
||
return message | ||
} |
80 changes: 80 additions & 0 deletions
80
cfcomponent/registrars/collectorregistrar/collector_registrar.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package collectorregistrar | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"github.com/cloudfoundry/gosteno" | ||
"github.com/cloudfoundry/loggregatorlib/cfcomponent" | ||
"github.com/cloudfoundry/yagnats" | ||
"time" | ||
) | ||
|
||
type ClientProvider func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) | ||
|
||
type CollectorRegistrar interface { | ||
Run(stopChan <-chan struct{}) | ||
} | ||
|
||
type collectorRegistrar struct { | ||
clientProvider ClientProvider | ||
interval time.Duration | ||
logger *gosteno.Logger | ||
cfc cfcomponent.Component | ||
client yagnats.ApceraWrapperNATSClient | ||
config *cfcomponent.Config | ||
} | ||
|
||
func NewCollectorRegistrar(clientProvider ClientProvider, cfc cfcomponent.Component, interval time.Duration, config *cfcomponent.Config) CollectorRegistrar { | ||
return &collectorRegistrar{ | ||
clientProvider: clientProvider, | ||
logger: cfc.Logger, | ||
cfc: cfc, | ||
interval: interval, | ||
config: config, | ||
} | ||
} | ||
|
||
func (registrar *collectorRegistrar) Run(stopChan <-chan struct{}) { | ||
ticker := time.NewTicker(registrar.interval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-stopChan: | ||
return | ||
case <-ticker.C: | ||
err := registrar.announceMessage() | ||
if err != nil { | ||
if registrar.client != nil { | ||
registrar.client.Disconnect() | ||
registrar.client = nil | ||
} | ||
registrar.logger.Warn(err.Error()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (registrar *collectorRegistrar) announceMessage() error { | ||
if registrar.client == nil { | ||
registrar.logger.Debugf("creating NATS client") | ||
|
||
var err error | ||
registrar.client, err = registrar.clientProvider(registrar.logger, registrar.config) | ||
if err != nil { | ||
return fmt.Errorf("Failed to create client: %s", err) | ||
} | ||
} | ||
|
||
json, err := json.Marshal(NewAnnounceComponentMessage(registrar.cfc)) | ||
if err != nil { | ||
return fmt.Errorf("Failed to marshal component message: %s", err) | ||
} | ||
|
||
err = registrar.client.Publish(AnnounceComponentMessageSubject, json) | ||
if err != nil { | ||
return fmt.Errorf("Failed to publish: %s", err) | ||
} | ||
|
||
return nil | ||
} |
13 changes: 13 additions & 0 deletions
13
cfcomponent/registrars/collectorregistrar/collectorregistrar_suite_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package collectorregistrar_test | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
||
"testing" | ||
) | ||
|
||
func TestCollectorregistrar(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "Collectorregistrar Suite") | ||
} |
121 changes: 121 additions & 0 deletions
121
cfcomponent/registrars/collectorregistrar/collectorregistrar_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package collectorregistrar_test | ||
|
||
import ( | ||
"github.com/cloudfoundry/loggregatorlib/cfcomponent/registrars/collectorregistrar" | ||
|
||
"errors" | ||
"github.com/apcera/nats" | ||
"github.com/cloudfoundry/gosteno" | ||
"github.com/cloudfoundry/loggregatorlib/cfcomponent" | ||
"github.com/cloudfoundry/loggregatorlib/loggertesthelper" | ||
"github.com/cloudfoundry/yagnats" | ||
"github.com/cloudfoundry/yagnats/fakeyagnats" | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
var _ = Describe("Collectorregistrar", func() { | ||
Describe("Run", func() { | ||
var ( | ||
fakeClient *fakeClient | ||
component cfcomponent.Component | ||
registrar collectorregistrar.CollectorRegistrar | ||
stopChan chan struct{} | ||
doneChan chan struct{} | ||
errorProvider func() error | ||
fakeClientProviderCallCount int32 | ||
) | ||
|
||
BeforeEach(func() { | ||
fakeClient = newFakeClient() | ||
component, _ = cfcomponent.NewComponent(loggertesthelper.Logger(), "compType", 3, nil, 9999, []string{"username", "password"}, nil) | ||
component.UUID = "OurUUID" | ||
errorProvider = func() error { | ||
return nil | ||
} | ||
fakeClientProviderCallCount = 0 | ||
fakeClientProvider := func(*gosteno.Logger, *cfcomponent.Config) (yagnats.ApceraWrapperNATSClient, error) { | ||
atomic.AddInt32(&fakeClientProviderCallCount, 1) | ||
return fakeClient, errorProvider() | ||
} | ||
registrar = collectorregistrar.NewCollectorRegistrar(fakeClientProvider, component, 10*time.Millisecond, nil) | ||
stopChan = make(chan struct{}) | ||
doneChan = make(chan struct{}) | ||
|
||
go func() { | ||
defer close(doneChan) | ||
registrar.Run(stopChan) | ||
}() | ||
}) | ||
|
||
AfterEach(func() { | ||
close(stopChan) | ||
Eventually(doneChan).Should(BeClosed()) | ||
}) | ||
|
||
Context("with no errors", func() { | ||
It("periodically announces itself via NATS", func() { | ||
var messages []*nats.Msg | ||
Eventually(func() int { | ||
messages = fakeClient.PublishedMessages(collectorregistrar.AnnounceComponentMessageSubject) | ||
return len(messages) | ||
}).Should(BeNumerically(">", 1)) | ||
|
||
for _, message := range messages { | ||
Expect(message.Data).To(MatchRegexp(`^\{"type":"compType","index":3,"host":"[^:]*:9999","uuid":"3-OurUUID","credentials":\["username","password"\]\}$`)) | ||
} | ||
}) | ||
|
||
It("reuses the client connection", func() { | ||
Eventually(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1)) | ||
Consistently(func() int32 { return atomic.LoadInt32(&fakeClientProviderCallCount) }).Should(BeEquivalentTo(1)) | ||
}) | ||
}) | ||
|
||
Context("with errors", func() { | ||
Context("from the client provider", func() { | ||
BeforeEach(func() { | ||
fakeError := errors.New("fake error") | ||
errorProvider = func() error { | ||
returnedError := fakeError | ||
fakeError = nil | ||
return returnedError | ||
} | ||
}) | ||
|
||
It("recovers when the client provider recovers", func() { | ||
Eventually(fakeClient.PublishedMessageCount).Should(BeNumerically(">", 0)) | ||
}) | ||
|
||
It("disconnects the client", func() { | ||
Eventually(fakeClient.Disconnected).Should(BeTrue()) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
|
||
type fakeClient struct { | ||
*fakeyagnats.FakeApceraWrapper | ||
disconnected bool | ||
} | ||
|
||
func newFakeClient() *fakeClient { | ||
return &fakeClient{ | ||
FakeApceraWrapper: fakeyagnats.NewApceraClientWrapper(), | ||
} | ||
} | ||
|
||
func (f *fakeClient) Disconnect() { | ||
f.Lock() | ||
defer f.Unlock() | ||
f.disconnected = true | ||
} | ||
|
||
func (f *fakeClient) Disconnected() bool { | ||
f.Lock() | ||
defer f.Unlock() | ||
return f.disconnected | ||
} |