Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions sd/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package etcd
import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"net"
"net/http"
Expand All @@ -12,6 +13,11 @@ import (
"golang.org/x/net/context"
)

var (
ErrNoKey = errors.New("no key provided")
ErrNoValue = errors.New("no value provided")
)

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries will query the given prefix in etcd and returns a set of entries.
Expand All @@ -20,6 +26,11 @@ type Client interface {
// WatchPrefix starts watching every change for given prefix in etcd. When an
// change is detected it will populate the responseChan when an *etcd.Response.
WatchPrefix(prefix string, responseChan chan *etcd.Response)

// Register a service with etcd.
Register(s Service) error
// Deregister a service with etcd.
Deregister(s Service) error
}

type client struct {
Expand Down Expand Up @@ -112,10 +123,16 @@ func (c *client) GetEntries(key string) ([]string, error) {
}

entries := make([]string, len(resp.Node.Nodes))
for i, node := range resp.Node.Nodes {
entries[i] = node.Value

if len(entries) > 0 {
for i, node := range resp.Node.Nodes {
entries[i] = node.Value
}
} else {
entries = append(entries, resp.Node.Value)
}
return entries, nil

}

// WatchPrefix implements the etcd Client interface.
Expand All @@ -129,3 +146,22 @@ func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) {
responseChan <- res
}
}

func (c *client) Register(s Service) error {
if s.Key == "" {
return ErrNoKey
}
if s.Value == "" {
return ErrNoValue
}
_, err := c.keysAPI.Create(c.ctx, s.Key, s.Value)
return err
}

func (c *client) Deregister(s Service) error {
if s.Key == "" {
return ErrNoKey
}
_, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions)
return err
}
70 changes: 70 additions & 0 deletions sd/etcd/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package etcd

import (
"testing"
"time"

"golang.org/x/net/context"
)

func TestNewClient(t *testing.T) {
ClientOptions := ClientOptions{
Cert: "",
Key: "",
CaCert: "",
DialTimeout: (2 * time.Second),
DialKeepAline: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
}

client, err := NewClient(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions,
)
if err != nil {
t.Fatalf("unexpected error creating client: %v", err)
}
if client == nil {
t.Fatal("expected new Client, got nil")
}
}

func TestOptions(t *testing.T) {
//creating new client should fail when providing invalid or missing endpoints
a, err := NewClient(
context.Background(),
[]string{},
ClientOptions{
Cert: "",
Key: "",
CaCert: "",
DialTimeout: (2 * time.Second),
DialKeepAline: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
})

if err == nil {
t.Errorf("expected error: %v", err)
}
if a != nil {
t.Fatalf("expected client to be nil on failure")
}

//creating new client should fail when providing invalid or missing endpoints
_, err = NewClient(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{
Cert: "blank.crt",
Key: "blank.key",
CaCert: "blank.cacert",
DialTimeout: (2 * time.Second),
DialKeepAline: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
})

if err == nil {
t.Errorf("expected error: %v", err)
}
}
117 changes: 117 additions & 0 deletions sd/etcd/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// +build integration

package etcd

import (
"flag"
"kit/log"
"os"
"testing"
"time"

etcdc "github.com/coreos/etcd/client"
etcdi "github.com/coreos/etcd/integration"
"golang.org/x/net/context"
)

var (
host []string
kitClientOptions ClientOptions
)

func TestMain(m *testing.M) {
flag.Parse()

kitClientOptions = ClientOptions{
Cert: "",
Key: "",
CaCert: "",
DialTimeout: (2 * time.Second),
DialKeepAline: (2 * time.Second),
HeaderTimeoutPerRequest: (2 * time.Second),
}

code := m.Run()

os.Exit(code)
}

func TestRegistrar(t *testing.T) {
ts := etcdi.NewCluster(t, 1)
ts.Launch(t)
kitClient, err := NewClient(context.Background(), []string{ts.URL(0)}, kitClientOptions)

// Valid registrar should pass
registrar := NewRegistrar(kitClient, Service{
Key: "somekey",
Value: "somevalue",
DeleteOptions: &etcdc.DeleteOptions{
PrevValue: "",
PrevIndex: 0,
Recursive: true,
Dir: false,
},
}, log.NewNopLogger())

registrar.Register()
r1, err := kitClient.GetEntries(registrar.service.Key)
if err != nil {
t.Fatalf("unexpected error when getting value for deregistered key: %v", err)
}

if want, have := registrar.service.Value, r1[0]; want != have {
t.Fatalf("want %q, have %q", want, have)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if want, have := registrar.service.Value, r1[0]; want != have {
    t.Fatalf("want %q, have %q", want, have)
}


registrar.Deregister()
r2, err := kitClient.GetEntries(registrar.service.Key)
if len(r2) > 0 {
t.Fatalf("unexpected value found for deregistered key: %s", r2)
}

// Registrar with no key should register but value will be blank
registrarNoKey := NewRegistrar(kitClient, Service{
Key: "",
Value: "somevalue",
DeleteOptions: &etcdc.DeleteOptions{
PrevValue: "",
PrevIndex: 0,
Recursive: true,
Dir: false,
},
}, log.NewNopLogger())

registrarNoKey.Register()
r3, err := kitClient.GetEntries(registrarNoKey.service.Key)
if err != nil {
t.Errorf("unexpected error when getting value for entry with no key: %v", err)
}

if want, have := "", r3[0]; want != have {
t.Fatalf("want %q, have %q", want, have)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if want, have := "", r3[0]; want != have {
    t.Fatalf("want %q, have %q", want, have)
}


// Registrar with no value should not register anything
registrarNoValue := NewRegistrar(kitClient, Service{
Key: "somekey",
Value: "",
DeleteOptions: &etcdc.DeleteOptions{
PrevValue: "",
PrevIndex: 0,
Recursive: true,
Dir: false,
},
}, log.NewNopLogger())

registrarNoValue.Register()
r4, err := kitClient.GetEntries(registrarNoValue.service.Key)
if err == nil {
t.Errorf("expected error when getting value for entry key which attempted to register with no value")
}

if len(r4) > 0 {
t.Fatalf("unexpected value retreived when getting value for entry with no value")
}

ts.Terminate(t)
}
52 changes: 52 additions & 0 deletions sd/etcd/registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package etcd

import (
etcd "github.com/coreos/etcd/client"
"github.com/go-kit/kit/log"
)

// Registrar registers service instance liveness information to etcd.
type Registrar struct {
client Client
service Service
logger log.Logger
}

// Service holds the key, value and instance identifying data you
// want to publish to etcd.
type Service struct {
Key string // discovery key, example: /myorganization/myplatform/
Value string // service name value, example: addsvc
DeleteOptions *etcd.DeleteOptions
}

// NewRegistrar returns a etcd Registrar acting on the provided catalog
// registration.
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
return &Registrar{
client: client,
service: service,
logger: log.NewContext(logger).With(
"value", service.Value,
"key", service.Key,
),
}
}

// Register implements sd.Registrar interface.
func (r *Registrar) Register() {
if err := r.client.Register(r.service); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "register")
}
}

// Deregister implements sd.Registrar interface.
func (r *Registrar) Deregister() {
if err := r.client.Deregister(r.service); err != nil {
r.logger.Log("err", err)
} else {
r.logger.Log("action", "deregister")
}
}
7 changes: 7 additions & 0 deletions sd/etcd/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,10 @@ func (c *fakeClient) GetEntries(prefix string) ([]string, error) {
}

func (c *fakeClient) WatchPrefix(prefix string, responseChan chan *stdetcd.Response) {}

func (c *fakeClient) Register(Service) error {
return nil
}
func (c *fakeClient) Deregister(Service) error {
return nil
}