Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 68 additions & 66 deletions sd/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,32 @@ import (
)

var (
ErrNoKey = errors.New("no key provided")
// ErrNoKey indicates a client method needs a key but receives none.
ErrNoKey = errors.New("no key provided")

// ErrNoValue indicates a client method needs a value but receives none.
ErrNoValue = errors.New("no value provided")
)

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries will query the given prefix in etcd and returns a set of entries.
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)

// WatchPrefix starts watching every change for given prefix in etcd. When an
// change is detected it will populate the responseChan when an *etcd.Response.
WatchPrefix(prefix string, responseChan chan *etcd.Response)
// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})

// Register a service with etcd.
Register(s Service) error

// Deregister a service with etcd.
Deregister(s Service) error
}
Expand All @@ -38,81 +49,71 @@ type client struct {
ctx context.Context
}

// ClientOptions defines options for the etcd client.
// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CaCert string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
HeaderTimeoutPerRequest time.Duration
}

// NewClient returns an *etcd.Client with a connection to the named machines.
// It will return an error if a connection to the cluster cannot be made.
// The parameter machines needs to be a full URL with schemas.
// e.g. "http://localhost:2379" will work, but "localhost:2379" will not.
// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
// will work, but "localhost:2379" will not.
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
var (
c etcd.KeysAPI
err error
caCertCt []byte
tlsCert tls.Certificate
)
if options.DialTimeout == 0 {
options.DialTimeout = 3 * time.Second
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = 3 * time.Second
}
if options.HeaderTimeoutPerRequest == 0 {
options.HeaderTimeoutPerRequest = 3 * time.Second
}

transport := etcd.DefaultTransport
if options.Cert != "" && options.Key != "" {
tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key)
tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
if err != nil {
return nil, err
}

caCertCt, err = ioutil.ReadFile(options.CaCert)
caCertCt, err := ioutil.ReadFile(options.CACert)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCertCt)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
RootCAs: caCertPool,
}

transport := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: func(network, addr string) (net.Conn, error) {
dial := &net.Dialer{
transport = &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{tlsCert},
RootCAs: caCertPool,
},
Dial: func(network, address string) (net.Conn, error) {
return (&net.Dialer{
Timeout: options.DialTimeout,
KeepAlive: options.DialKeepAlive,
}
return dial.Dial(network, addr)
}).Dial(network, address)
},
}
}

cfg := etcd.Config{
Endpoints: machines,
Transport: transport,
HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
}
ce, err := etcd.New(cfg)
if err != nil {
return nil, err
}
c = etcd.NewKeysAPI(ce)
} else {
cfg := etcd.Config{
Endpoints: machines,
Transport: etcd.DefaultTransport,
HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
}
ce, err := etcd.New(cfg)
if err != nil {
return nil, err
}
c = etcd.NewKeysAPI(ce)
ce, err := etcd.New(etcd.Config{
Endpoints: machines,
Transport: transport,
HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
})
if err != nil {
return nil, err
}

return &client{c, ctx}, nil
return &client{
keysAPI: etcd.NewKeysAPI(ce),
ctx: ctx,
}, nil
}

// GetEntries implements the etcd Client interface.
Expand All @@ -122,28 +123,29 @@ func (c *client) GetEntries(key string) ([]string, error) {
return nil, err
}

entries := make([]string, len(resp.Node.Nodes))
// Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
// resp.Node.Value is also empty, in which case the key is empty and we
// should not return any entries.
if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
return []string{resp.Node.Value}, nil
}

if len(entries) > 0 {
for i, node := range resp.Node.Nodes {
entries[i] = node.Value
}
} else {
entries = append(entries, resp.Node.Value)
entries := make([]string, len(resp.Node.Nodes))
for i, node := range resp.Node.Nodes {
entries[i] = node.Value
}
return entries, nil

}

// WatchPrefix implements the etcd Client interface.
func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) {
func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
ch <- struct{}{} // make sure caller invokes GetEntries
for {
res, err := watch.Next(c.ctx)
if err != nil {
if _, err := watch.Next(c.ctx); err != nil {
return
}
responseChan <- res
ch <- struct{}{}
}
}

Expand Down
42 changes: 18 additions & 24 deletions sd/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@ import (
)

func TestNewClient(t *testing.T) {
ClientOptions := ClientOptions{
Cert: "",
Key: "",
CaCert: "",
DialTimeout: (2 * time.Second),
DialKeepAlive: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
}

client, err := NewClient(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions,
ClientOptions{
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("unexpected error creating client: %v", err)
Expand All @@ -30,40 +25,39 @@ func TestNewClient(t *testing.T) {
}
}

// NewClient should fail when providing invalid or missing endpoints.
func TestOptions(t *testing.T) {
//creating new client should fail when providing invalid or missing endpoints
a, err := NewClient(
context.Background(),
[]string{},
ClientOptions{
Cert: "",
Key: "",
CaCert: "",
DialTimeout: (2 * time.Second),
DialKeepAlive: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
})

CACert: "",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
if a != nil {
t.Fatalf("expected client to be nil on failure")
}

//creating new client should fail when providing invalid or missing endpoints
_, err = NewClient(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{
Cert: "blank.crt",
Key: "blank.key",
CaCert: "blank.cacert",
DialTimeout: (2 * time.Second),
DialKeepAlive: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
})

CACert: "blank.CACert",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion sd/etcd/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
// Package etcd provides a subscriber implementation for etcd.
// Package etcd provides a Subscriber and Registrar implementation for etcd. If
// you use etcd as your service discovery system, this package will help you
// implement the registration and client-side load balancing patterns.
package etcd
86 changes: 42 additions & 44 deletions sd/etcd/example_test.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,67 @@
package etcd

import (
"fmt"
"time"
"io"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/endpoint"
"golang.org/x/net/context"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd/lb"
)

// Package sd/etcd provides a wrapper around the coroes/etcd key value store (https://github.com/coreos/etcd)
// This example assumes the user has an instance of etcd installed and running locally on port 2379
func Example() {

// Let's say this is a service that means to register itself.
// First, we will set up some context.
var (
prefix = "/services/foosvc/" // known at compile time
instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
key = prefix + instance
value = "http://" + instance // based on our transport
etcdServer = "http://10.0.0.1:2379" // don't forget schema and port!
prefix = "/services/foosvc/" // known at compile time
instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
key = prefix + instance // should be globally unique
value = "http://" + instance // based on our transport
ctx = context.Background()
)

client, err := NewClient(context.Background(), []string{"http://:2379"}, ClientOptions{
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
})
// Build the client.
client, err := NewClient(ctx, []string{etcdServer}, ClientOptions{})
if err != nil {
panic(err)
}

// Instantiate new instance of *Registrar passing in test data
// Build the registrar.
registrar := NewRegistrar(client, Service{
Key: key,
Value: value,
}, log.NewNopLogger())
// Register new test data to etcd
registrar.Register()

//Retrieve entries from etcd
_, err = client.GetEntries(key)
if err != nil {
fmt.Println(err)
}

factory := func(string) (endpoint.Endpoint, io.Closer, error) {
return endpoint.Nop, nil, nil
}
subscriber, _ := NewSubscriber(client, prefix, factory, log.NewNopLogger())

endpoints, err := subscriber.Endpoints()
if err != nil {
fmt.Printf("err: %v", err)
}
fmt.Println(len(endpoints)) // hopefully 1
// Register our instance.
registrar.Register()

// Deregister first instance of test data
registrar.Deregister()
// At the end of our service lifecycle, for example at the end of func main,
// we should make sure to deregister ourselves. This is important! Don't
// accidentally skip this step by invoking a log.Fatal or os.Exit in the
// interim, which bypasses the defer stack.
defer registrar.Deregister()

endpoints, err = subscriber.Endpoints()
// It's likely that we'll also want to connect to other services and call
// their methods. We can build a subscriber to listen for changes from etcd
// and build endpoints, wrap it with a load-balancer to pick a single
// endpoint, and finally wrap it with a retry strategy to get something that
// can be used as an endpoint directly.
barPrefix := "/services/barsvc"
subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger())
if err != nil {
fmt.Printf("err: %v", err)
panic(err)
}
fmt.Println(len(endpoints)) // hopefully 0
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(3, 3*time.Second, balancer)

// Verify test data no longer exists in etcd
_, err = client.GetEntries(key)
if err != nil {
fmt.Println(err)
// And now retry can be used like any other endpoint.
req := struct{}{}
if _, err = retry(ctx, req); err != nil {
panic(err)
}
}

func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }
Loading