Skip to content
Permalink
Browse files

Refactoring without client intermediary

  • Loading branch information...
martinbaillie committed Apr 10, 2017
1 parent 2a823da commit 443f6ead51a91ddf48136543c1878aedb3efb2fd

This file was deleted.

This file was deleted.

@@ -29,37 +29,36 @@ func TestIntegration(t *testing.T) {
t.Skip("EUREKA_ADDR is not set")
}

var client Client
{
var fargoConfig fargo.Config
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
fargoConfig.Eureka.PollIntervalSeconds = 1

fargoConnection := fargo.NewConnFromConfig(fargoConfig)
client = NewClient(&fargoConnection)
}

logger := log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestamp)

var fargoConfig fargo.Config
// Target Eureka server(s).
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
// How often the subscriber should poll for updates.
fargoConfig.Eureka.PollIntervalSeconds = 1

// Create a Fargo connection and a Eureka registrar.
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1"))

// Register one instance.
registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1"))
registrar1.Register()
defer registrar1.Deregister()

// This should be enough time for the Eureka server response cache to update.
time.Sleep(time.Second)

// Build a subscriber.
// Build a Eureka subscriber.
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
t.Logf("factory invoked for %q", instance)
return endpoint.Nop, nil, nil
}
s := NewSubscriber(
client,
&fargoConnection,
appNameTest,
factory,
log.With(logger, "component", "subscriber"),
instanceTest1.App,
)
defer s.Stop()

@@ -73,7 +72,7 @@ func TestIntegration(t *testing.T) {
}

// Register a second instance
registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2"))
registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2"))
registrar2.Register()
defer registrar2.Deregister() // In case of exceptional circumstances.

@@ -2,79 +2,126 @@ package eureka

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/hudl/fargo"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
)

// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second

// The methods of fargo.Connection used in this package.
type fargoConnection interface {
RegisterInstance(instance *fargo.Instance) error
DeregisterInstance(instance *fargo.Instance) error
ReregisterInstance(instance *fargo.Instance) error
HeartBeatInstance(instance *fargo.Instance) error
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
GetApp(name string) (*fargo.Application, error)
}

type fargoUnsuccessfulHTTPResponse struct {
statusCode int
messagePrefix string
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
client Client
conn fargoConnection
instance *fargo.Instance
logger log.Logger
quit chan struct{}
wg sync.WaitGroup
quitc chan chan struct{}
sync.Mutex
}

var _ sd.Registrar = (*Registrar)(nil)

// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo instance.
func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar {
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
return &Registrar{
client: client,
instance: i,
logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)),
conn: conn,
instance: instance,
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
}
}

// Register implements sd.Registrar interface.
// Register implements sd.Registrar.
func (r *Registrar) Register() {
if err := r.client.Register(r.instance); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "register")
r.Lock()
defer r.Unlock()

if r.quitc != nil {
return // Already in the registration loop.
}

if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
// User has opted for heartbeat functionality in Eureka.
if r.quit == nil {
r.quit = make(chan struct{})
r.wg.Add(1)
go r.loop()
}
if err := r.conn.RegisterInstance(r.instance); err != nil {
r.logger.Log("during", "Register", "err", err)
}

r.quitc = make(chan chan struct{})
go r.loop()
}

// Deregister implements sd.Registrar interface.
// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
if err := r.client.Deregister(r.instance); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "deregister")
}
r.Lock()
defer r.Unlock()

if r.quit != nil {
close(r.quit)
r.wg.Wait()
r.quit = nil
if r.quitc == nil {
return // Already deregistered.
}

q := make(chan struct{})
r.quitc <- q
<-q
r.quitc = nil
}

func (r *Registrar) loop() {
tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second)
defer tick.Stop()
defer r.wg.Done()
var renewalInterval time.Duration
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
} else {
renewalInterval = defaultRenewalInterval
}
ticker := time.NewTicker(renewalInterval)
defer ticker.Stop()

for {
select {
case <-tick.C:
if err := r.client.Heartbeat(r.instance); err != nil {
r.logger.Log("err", err)
case <-ticker.C:
if err := r.heartbeat(); err != nil {
r.logger.Log("during", "heartbeat", "err", err)
}

case q := <-r.quitc:
if err := r.conn.DeregisterInstance(r.instance); err != nil {
r.logger.Log("during", "Deregister", "err", err)
}
case <-r.quit:
close(q)
return
}
}
}

func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err != nil {
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
// Instance expired (e.g. network partition). Re-register.
r.logger.Log("during", "heartbeat", err.Error())
return r.conn.ReregisterInstance(r.instance)
}
}
return err
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}

0 comments on commit 443f6ea

Please sign in to comment.
You can’t perform that action at this time.