Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Eureka service discovery implementation #504

Merged
merged 5 commits into from Apr 10, 2017

Conversation

martinbaillie
Copy link
Contributor

This commit adds a service discovery implementation for the Eureka
registry, a component in Netflix's OSS suite. Eureka is a popular
choice in JVM-based microservice architectures, particularly when used
in conjunction with the Spring Cloud ecosystem.

This implementation delegates integration to Fargo: the de facto Golang
Eureka client. It allows the user to employ a Fargo connection as the
foundational configuration item for Registrars and Subscribers. This
should offer the user the most control of Fargo within the constraints
of the Go-kit service discovery abstractions.

This commit adds a service discovery implementation for the Eureka
registry, a component in Netflix's OSS suite. Eureka is a popular
choice in JVM-based microservice architectures, particularly when used
in conjunction with the Spring Cloud ecosystem.

This implementation delegates integration to Fargo: the de facto Golang
Eureka client. It allows the user to employ a Fargo connection as the
foundational configuration item for Registrars and Subscribers. This
should offer the user the most control of Fargo within the constraints
of the Go-kit service discovery abstractions.
Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things to fix, but I like the shape of it! Thank you for the contribution.

package eureka

import (
stdeureka "github.com/hudl/fargo"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this aliased?

edit: to be less obscure ;) I think it makes more sense to just call this fargo everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Wasn't entirely sure on the go-kit convention for upstream libs.

func init() {
// Quieten Fargo's own logging
stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm unhappy to see the init. If we had completely encapsulated the Fargo dep, then this might make sense, but given we expose its types in our API, I think it makes more sense to just skip this step, and perhaps warn users in comments that Fargo is very log-spammy and they may want to do something like this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, I was in two mind myself. I've ditched it now. As per chat on Gophers Slack, I have a PR with Fargo to allow configurability of their logger by consumers.


// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo instance.
func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/l/logger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops. Force of habit. Fixed this everywhere.

defer r.quitmtx.Unlock()
if r.quit != nil {
r.quit <- true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this seems like a lot of unnecessary ceremony. Can you think of a way to get the semantics you want without a mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I was able to shuffle things around and ditch the mutex entirely. Thanks.

package eureka

import (
fargo "github.com/hudl/fargo"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to alias the import with its own name :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, thanks. Too hasty with a s/stdeureka/fargo/g

"errors"
"reflect"

fargo "github.com/hudl/fargo"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise above (further comments elided)

if r.quit == nil {
r.quit = make(chan bool)
go r.loop()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, no I'm afraid this is racy, now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear I may need to reintroduce that mutex to remove the racy-ness completely (latest commit). I'm still getting up to speed on Go's concurrency patterns and I would welcome any thoughts you had on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can start by gathering requirements. What behaviors are you trying to allow in this implementation of Register/Deregister? How do those behaviors compare to the other registrars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour is to make periodic heartbeat calls to the Eureka registry after initial registration. By default Eureka will expect a heartbeat every 30s, and 3 missed heartbeats will result in the instance being expired. This is why I felt it was a must-have, and while Fargo makes available a Heartbeat RPC for the correct Eureka endpoint, it does not provide the periodic functionality.

Also, these heartbeat periods and expiry thresholds are decided when setting up a Eureka server cluster and will vary in the wild. To cater for this I made the heartbeat interval configurable in the go-kit implementation using the instance's LeaseInfo.RenewalIntervalInSecs (part of the Eureka client spec).

Regarding how the behaviours compare, I had a look around and found similar functionality (I think) in sd/etcd/registrar.go with its TTLOption and mutex gated heartbeat loop. However, all that said - I had a play around there and I can achieve the same with a WaitGroup rather than Mutex. Please check the latest when you get a chance.

@peterbourgon
Copy link
Member

peterbourgon commented Apr 3, 2017

@martinbaillie On review I think the intermediary Client struct is obscuring things. I threw this together, using the fargo.EurekaConnection directly in the Registrar and Subscriber structs. I also cleaned up the termination behaviors; I think they should be race-free. What do you think?

package eureka

import (
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/hudl/fargo"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/cache"
)

// Registrar implements sd.Registrar for a
// Eureka connection and specific instance.
type Registrar struct {
	conn     *fargo.EurekaConnection
	instance *fargo.Instance
	interval time.Duration
	logger   log.Logger

	mtx   sync.Mutex
	quitc chan chan struct{}
}

var _ sd.Registrar = (*Registrar)(nil)

// NewRegistrar returns an Eureka Registrar.
func NewRegistrar(conn *fargo.EurekaConnection, instance *fargo.Instance, interval time.Duration, logger log.Logger) *Registrar {
	return &Registrar{
		conn:     conn,
		instance: instance,
		interval: interval,
		logger:   logger,
	}
}

// Register implements sd.Registrar.
func (r *Registrar) Register() {
	r.mtx.Lock()
	defer r.mtx.Unlock()

	if r.quitc != nil {
		return // already in the register loop
	}

	r.quitc = make(chan chan struct{})
	go r.registerLoop()
}

// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
	r.mtx.Lock()
	defer r.mtx.Unlock()

	if r.quitc == nil {
		return // already deregistered
	}

	q := make(chan struct{})
	r.quitc <- q
	<-q
	r.quitc = nil
}

func (r *Registrar) registerLoop() {
	ticker := time.NewTicker(r.interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := r.heartbeat(); err != nil {
				r.logger.Log("during", "heartbeat", "err", err)
			}

		case q := <-r.quitc:
			if err := r.conn.DeregisterInstance(r.instance); err != nil {
				r.logger.Log("during", "Deregister", "err", err)
			}
			close(q)
			return
		}
	}
}

func (r *Registrar) heartbeat() error {
	err := r.conn.HeartBeatInstance(r.instance)
	if err != nil {
		if code, ok := fargo.HTTPResponseStatusCode(err); ok && code == http.StatusNotFound {
			// Instance not registered yet.
			return r.conn.RegisterInstance(r.instance)
		}
	}
	return err
}

// Subscriber implements sd.Subscriber for a
// Eureka connection and specific app.
type Subscriber struct {
	conn    *fargo.EurekaConnection
	app     string
	factory sd.Factory
	logger  log.Logger
	cache   *cache.Cache
	quitc   chan chan struct{}
}

var _ sd.Subscriber = (*Subscriber)(nil)

// NewSubscriber returns a new Eureka-backed sd.Subscriber.
func NewSubscriber(conn *fargo.EurekaConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber {
	logger = log.With(logger, "app", app)

	s := &Subscriber{
		conn:    conn,
		app:     app,
		factory: factory,
		logger:  logger,
		cache:   cache.New(factory, logger),
		quitc:   make(chan chan struct{}),
	}

	instances, err := s.getInstances()
	if err == nil {
		logger.Log("instances", len(instances))
	} else {
		logger.Log("during", "getInstances", "err", err)
	}

	s.cache.Update(instances)
	go s.loop()
	return s
}

// Endpoints implements sd.Subscriber.
func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
	return s.cache.Endpoints(), nil
}

func (s *Subscriber) loop() {
	var (
		await   = false
		done    = make(chan struct{})
		updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
	)
	defer close(done)

	for {
		select {
		case update := <-updatec:
			if update.Err != nil {
				s.logger.Log("during", "Update", "err", update.Err)
				continue
			}
			instances := convertFargoAppToInstances(update.App)
			s.logger.Log("instances", len(instances))
			s.cache.Update(instances)

		case q := <-s.quitc:
			close(q)
			return
		}
	}
}

func (s *Subscriber) getInstances() ([]string, error) {
	app, err := s.conn.GetApp(s.app)
	if err != nil {
		return nil, err
	}
	return convertFargoAppToInstances(app), nil
}

func convertFargoAppToInstances(app *fargo.Application) []string {
	instances := make([]string, len(app.Instances))
	for i, inst := range app.Instances {
		instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
	}
	return instances
}

@martinbaillie
Copy link
Contributor Author

@peterbourgon I've had a quick glance today at lunch. This is actually how I started out before spotting the other sd implementation's use of the client intermediary. I'm happy to rework the code and tests sans client abstraction (may not get time 'til end of week). Just wanted to make sure you are cool with this implementation being somewhat of a departure from the others?

Also reading through the code & thinking aloud:

  • I expect I would need to introduce an interface covering all the methods of fargo.EurekaConnection that are being used so I can mock them in unit tests (the client abstraction naturally provided this previously). This may make things a little uneasy on the eyes and confuse consumers on what needs to be passed to the NewRegistrar/NewSubscriber methods.

  • I think we should register straight off the bat rather than waiting that interval for the ticker to fire in the heartbeat loop as it's common for the heartbeat interval to be upwards of 30s and a consumer would probably expect their service to get registered as soon as they call Register()

  • I was using a duration found on fargo.Instance.LeaseInfo.RenewalIntervalInSecs that's part of the Eureka XML spec. It could be used instead of that standalone interval variable on your Registrar. Any objections to using that? It would then be similar to how a consumer would set fargo.EurekaConnection.PollInterval to control subscriber updates.

@peterbourgon
Copy link
Member

I expect I would need to introduce an interface covering all the methods of fargo.EurekaConnection that are being used so I can mock them in unit tests (the client abstraction naturally provided this previously). This may make things a little uneasy on the eyes and confuse consumers on what needs to be passed to the NewRegistrar/NewSubscriber methods.

Yep, sounds good. Can just document it in the comment.

I think we should register straight off the bat rather than waiting that interval for the ticker to fire in the heartbeat loop as it's common for the heartbeat interval to be upwards of 30s and a consumer would probably expect their service to get registered as soon as they call Register()

Yep, sounds good, was an oversight of mine.

I was using a duration found on fargo.Instance.LeaseInfo.RenewalIntervalInSecs that's part of the Eureka XML spec. It could be used instead of that standalone interval variable on your Registrar. Any objections to using that? It would then be similar to how a consumer would set fargo.EurekaConnection.PollInterval to control subscriber updates.

Right! I went looking for that but didn't find it immediately. Sounds good.

@martinbaillie
Copy link
Contributor Author

Hi @peterbourgon, I've removed the client intermediary tonight based on your feedback. A pleasant side effect is it is now easy to achieve 100% coverage with the tests. Please let me know if there's any tidy-up required.

@peterbourgon
Copy link
Member

Badass! Thank you for your tolerance of my nitpicking. I think we have a great new addition to package sd. 💯

@peterbourgon peterbourgon merged commit d0853ee into go-kit:master Apr 10, 2017
@martinbaillie
Copy link
Contributor Author

No problem. Thanks for the effort you've put into go-kit. Looking forward to using it more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants