Skip to content

Commit

Permalink
Merge pull request go-kit#100 from xla/consul-publisher
Browse files Browse the repository at this point in the history
Add Consul publisher
  • Loading branch information
peterbourgon committed Oct 29, 2015
2 parents d7a4637 + bd47757 commit 853badf
Show file tree
Hide file tree
Showing 3 changed files with 412 additions and 0 deletions.
30 changes: 30 additions & 0 deletions loadbalancer/consul/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package consul

import consul "github.com/hashicorp/consul/api"

// Client is a wrapper around the Consul API.
type Client interface {
Service(service string, tag string, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
}

type client struct {
consul *consul.Client
}

// NewClient returns an implementation of the Client interface expecting a fully
// setup Consul Client.
func NewClient(c *consul.Client) Client {
return &client{
consul: c,
}
}

// GetInstances returns the list of healthy entries for a given service filtered
// by tag.
func (c *client) Service(
service string,
tag string,
opts *consul.QueryOptions,
) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
return c.consul.Health().Service(service, tag, true, opts)
}
175 changes: 175 additions & 0 deletions loadbalancer/consul/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package consul

import (
"fmt"

consul "github.com/hashicorp/consul/api"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/loadbalancer"
"github.com/go-kit/kit/log"
)

const defaultIndex = 0

// Publisher yields endpoints for a service in Consul. Updates to the service
// are watched and will update the Publisher endpoints.
type Publisher struct {
cache *loadbalancer.EndpointCache
client Client
logger log.Logger
service string
tags []string
endpointsc chan []endpoint.Endpoint
quitc chan struct{}
}

// NewPublisher returns a Consul publisher which returns Endpoints for the
// requested service. It only returns instances for which all of the passed
// tags are present.
func NewPublisher(
client Client,
factory loadbalancer.Factory,
logger log.Logger,
service string,
tags ...string,
) (*Publisher, error) {
logger = log.NewContext(logger).With("component", "Consul Publisher")

p := &Publisher{
cache: loadbalancer.NewEndpointCache(factory, logger),
client: client,
logger: logger,
service: service,
tags: tags,
quitc: make(chan struct{}),
}

is, index, err := p.getInstances(defaultIndex)
if err != nil {
return nil, err
}

p.cache.Replace(is)

go p.loop(index)

return p, nil
}

// Endpoints implements the Publisher interface.
func (p *Publisher) Endpoints() ([]endpoint.Endpoint, error) {
return p.cache.Endpoints()
}

// Stop terminates the publisher.
func (p *Publisher) Stop() {
close(p.quitc)
}

func (p *Publisher) loop(lastIndex uint64) {
var (
errc = make(chan error, 1)
resc = make(chan response, 1)
)

for {
go func() {
is, index, err := p.getInstances(lastIndex)
if err != nil {
errc <- err
return
}

resc <- response{
index: index,
instances: is,
}
}()

select {
case err := <-errc:
p.logger.Log("service", p.service, "err", err)
case res := <-resc:
p.cache.Replace(res.instances)
lastIndex = res.index
case <-p.quitc:
return
}
}
}

func (p *Publisher) getInstances(lastIndex uint64) ([]string, uint64, error) {
tag := ""

if len(p.tags) > 0 {
tag = p.tags[0]
}

entries, meta, err := p.client.Service(
p.service,
tag,
&consul.QueryOptions{
WaitIndex: lastIndex,
},
)
if err != nil {
return nil, 0, err
}

// If more than one tag is passed we need to filter it in the publisher until
// Consul supports multiple tags[0].
//
// [0] https://github.com/hashicorp/consul/issues/294
if len(p.tags) > 1 {
entries = filterEntries(entries, p.tags[1:]...)
}

return makeInstances(entries), meta.LastIndex, nil
}

// response is used as container to transport instances as well as the updated
// index.
type response struct {
index uint64
instances []string
}

func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
var es []*consul.ServiceEntry

ENTRIES:
for _, entry := range entries {
ts := make(map[string]struct{}, len(entry.Service.Tags))

for _, tag := range entry.Service.Tags {
ts[tag] = struct{}{}
}

for _, tag := range tags {
if _, ok := ts[tag]; !ok {
continue ENTRIES
}
}

es = append(es, entry)
}

return es
}

func makeInstances(entries []*consul.ServiceEntry) []string {
is := make([]string, len(entries))

for i, entry := range entries {
addr := entry.Node.Address

if entry.Service.Address != "" {
addr = entry.Service.Address
}

is[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
}

return is
}
Loading

0 comments on commit 853badf

Please sign in to comment.