Skip to content

Commit

Permalink
Merge pull request #547 from seh/remove-eureka-test-race
Browse files Browse the repository at this point in the history
sd/eureka: remove data races in unit tests
  • Loading branch information
peterbourgon committed Jun 8, 2017
2 parents 7fe3c30 + a843a9e commit 714eef7
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 111 deletions.
43 changes: 18 additions & 25 deletions sd/eureka/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instance
quitc: make(chan chan struct{}),
}

instances, err := s.getInstances()
if err == nil {
s.logger.Log("instances", len(instances))
} else {
s.logger.Log("during", "getInstances", "err", err)
}

s.cache.Update(sd.Event{Instances: instances, Err: err})
go s.loop()
done := make(chan struct{})
updates := conn.ScheduleAppUpdates(app, true, done)
s.consume(<-updates)
go s.loop(updates, done)
return s
}

Expand All @@ -53,26 +48,24 @@ func (s *Instancer) Stop() {
s.quitc = nil
}

func (s *Instancer) loop() {
var (
await = false
done = make(chan struct{})
updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
)
func (s *Instancer) consume(update fargo.AppUpdate) {
if update.Err != nil {
s.logger.Log("during", "Update", "err", update.Err)
s.cache.Update(sd.Event{Err: update.Err})
return
}
instances := convertFargoAppToInstances(update.App)
s.logger.Log("instances", len(instances))
s.cache.Update(sd.Event{Instances: instances})
}

func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
defer close(done)

for {
select {
case update := <-updatec:
if update.Err != nil {
s.logger.Log("during", "Update", "err", update.Err)
s.cache.Update(sd.Event{Err: update.Err})
continue
}
instances := convertFargoAppToInstances(update.App)
s.logger.Log("instances", len(instances))
s.cache.Update(sd.Event{Instances: instances})

case update := <-updates:
s.consume(update)
case q := <-s.quitc:
close(q)
return
Expand Down
72 changes: 29 additions & 43 deletions sd/eureka/instancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,94 +13,80 @@ var _ sd.Instancer = &Instancer{} // API check

func TestInstancer(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
instances: []*fargo.Instance{instanceTest1, instanceTest2},
errApplication: nil,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
state := instancer.state()
if state.Err != nil {
t.Fatal(state.Err)
}

if want, have := 1, len(state.Instances); want != have {
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}

func TestInstancerScheduleUpdates(t *testing.T) {
func TestInstancerReceivesUpdates(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
errApplication: nil,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if want, have := 1, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
verifyCount := func(want int) (have int, converged bool) {
const maxPollAttempts = 5
const delayPerAttempt = 200 * time.Millisecond
for i := 1; ; i++ {
state := instancer.state()
if have := len(state.Instances); want == have {
return have, true
} else if i == maxPollAttempts {
return have, false
}
time.Sleep(delayPerAttempt)
}
}

time.Sleep(50 * time.Millisecond)

state = instancer.cache.State()
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %v, have %v", want, have)
if have, converged := verifyCount(1); !converged {
t.Fatalf("initial: want %d, have %d", 1, have)
}
}

func TestBadInstancerInstances(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{},
errInstances: errTest,
application: appUpdateTest,
errApplication: nil,
if err := connection.RegisterInstance(instanceTest2); err != nil {
t.Fatalf("failed to register an instance: %v", err)
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if state.Err == nil {
t.Fatal("expecting error")
if have, converged := verifyCount(2); !converged {
t.Fatalf("after registration: want %d, have %d", 2, have)
}

if want, have := 0, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
if err := connection.DeregisterInstance(instanceTest1); err != nil {
t.Fatalf("failed to unregister an instance: %v", err)
}
if have, converged := verifyCount(1); !converged {
t.Fatalf("after deregistration: want %d, have %d", 1, have)
}
}

func TestBadInstancerScheduleUpdates(t *testing.T) {
connection := &testConnection{
instances: []*fargo.Instance{instanceTest1},
application: appUpdateTest,
errApplication: errTest,
}

instancer := NewInstancer(connection, appNameTest, loggerTest)
defer instancer.Stop()

state := instancer.cache.State()
if state.Err != nil {
t.Error(state.Err)
}
if want, have := 1, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

time.Sleep(50 * time.Millisecond)

state = instancer.cache.State()
state := instancer.state()
if state.Err == nil {
t.Fatal("expecting error")
}

if want, have := 0, len(state.Instances); want != have {
t.Errorf("want %v, have %v", want, have)
t.Errorf("want %d, have %d", want, have)
}
}
36 changes: 26 additions & 10 deletions sd/eureka/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type fargoUnsuccessfulHTTPResponse struct {
messagePrefix string
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
Expand Down Expand Up @@ -110,18 +114,30 @@ func (r *Registrar) loop() {
}
}

func httpResponseStatusCode(err error) (code int, present bool) {
if code, ok := fargo.HTTPResponseStatusCode(err); ok {
return code, true
}
// Allow injection of errors for testing.
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
return u.statusCode, true
}
return 0, false
}

func isNotFound(err error) bool {
code, ok := httpResponseStatusCode(err)
return ok && code == http.StatusNotFound
}

func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err != nil {
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
// Instance expired (e.g. network partition). Re-register.
r.logger.Log("during", "heartbeat", err.Error())
return r.conn.ReregisterInstance(r.instance)
}
if err == nil {
return nil
}
if isNotFound(err) {
// Instance expired (e.g. network partition). Re-register.
return r.conn.ReregisterInstance(r.instance)
}
return err
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}
110 changes: 77 additions & 33 deletions sd/eureka/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package eureka

import (
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/hudl/fargo"
)

type testConnection struct {
instances []*fargo.Instance
application *fargo.Application
errInstances error
mu sync.RWMutex
instances []*fargo.Instance

errApplication error
errHeartbeat error
errRegister error
Expand All @@ -23,10 +26,6 @@ var (
errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"}
loggerTest = log.NewNopLogger()
appNameTest = "go-kit"
appUpdateTest = &fargo.Application{
Name: appNameTest,
Instances: []*fargo.Instance{instanceTest1, instanceTest2},
}
instanceTest1 = &fargo.Instance{
HostName: "serveregistrar1.acme.org",
Port: 8080,
Expand Down Expand Up @@ -59,50 +58,95 @@ var (
var _ fargoConnection = (*testConnection)(nil)

func (c *testConnection) RegisterInstance(i *fargo.Instance) error {
if c.errRegister == nil {
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
return errors.New("already registered")
}
if c.errRegister != nil {
return c.errRegister
}
c.mu.Lock()
defer c.mu.Unlock()
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
return errors.New("already registered")
}

c.instances = append(c.instances, i)
}
return c.errRegister
c.instances = append(c.instances, i)
return nil
}

func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error {
return c.errHeartbeat
}

func (c *testConnection) DeregisterInstance(i *fargo.Instance) error {
if c.errDeregister == nil {
var newInstances []*fargo.Instance
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
continue
}
newInstances = append(newInstances, instance)
}
if len(newInstances) == len(c.instances) {
return errors.New("not registered")
if c.errDeregister != nil {
return c.errDeregister
}
c.mu.Lock()
defer c.mu.Unlock()
remaining := make([]*fargo.Instance, 0, len(c.instances))
for _, instance := range c.instances {
if reflect.DeepEqual(*instance, *i) {
continue
}

c.instances = newInstances
remaining = append(remaining, instance)
}
return c.errDeregister
if len(remaining) == len(c.instances) {
return errors.New("not registered")
}
c.instances = remaining
return nil
}

func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error {
return nil
}

func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
updatec := make(chan fargo.AppUpdate, 1)
updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication}
return updatec
func (c *testConnection) instancesForApplication(name string) []*fargo.Instance {
c.mu.RLock()
defer c.mu.RUnlock()
instances := make([]*fargo.Instance, 0, len(c.instances))
for _, i := range c.instances {
if i.App == name {
instances = append(instances, i)
}
}
return instances
}

func (c *testConnection) GetApp(name string) (*fargo.Application, error) {
return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances
if err := c.errApplication; err != nil {
return nil, err
}
instances := c.instancesForApplication(name)
if len(instances) == 0 {
return nil, fmt.Errorf("Application not found for name=%s", name)
}
return &fargo.Application{Name: name, Instances: instances}, nil
}

func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
updatec := make(chan fargo.AppUpdate, 1)
send := func() {
app, err := c.GetApp(name)
select {
case updatec <- fargo.AppUpdate{App: app, Err: err}:
default:
}
}

if await {
send()
}
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
send()
case <-done:
ticker.Stop()
return
}
}
}()
return updatec
}

0 comments on commit 714eef7

Please sign in to comment.