Skip to content

Commit

Permalink
Fixed instancer logic for Consul
Browse files Browse the repository at this point in the history
The instancer loop was never updating the lastIndex but passing it to getInstances which caused Instancer to spam Consul repeatedly and nearly killed our Consul agent pods and time instances were added or removed from Consul for that service.
  • Loading branch information
jkratz55 committed Feb 14, 2022
1 parent 121aeb2 commit b285909
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
5 changes: 3 additions & 2 deletions sd/consul/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func (s *Instancer) loop(lastIndex uint64) {
instances []string
err error
d time.Duration = 10 * time.Millisecond
index uint64
)
for {
index := lastIndex
instances, index, err = s.getInstances(lastIndex, s.quitc)
switch {
case errors.Is(err, errStopped):
Expand All @@ -82,11 +82,12 @@ func (s *Instancer) loop(lastIndex uint64) {
time.Sleep(d)
d = conn.Exponential(d)
case index < lastIndex:
s.logger.Log("err", "index is less than previous; reseting to default")
s.logger.Log("err", "index is less than previous; resetting to default")
lastIndex = defaultIndex
time.Sleep(d)
d = conn.Exponential(d)
default:
lastIndex = index
s.cache.Update(sd.Event{Instances: instances})
d = 10 * time.Millisecond
}
Expand Down
64 changes: 64 additions & 0 deletions sd/consul/instancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consul

import (
"context"
"fmt"
"io"
"testing"
"time"
Expand Down Expand Up @@ -261,3 +262,66 @@ func TestInstancerWithInvalidIndex(t *testing.T) {
t.Error("failed, to receive call in time")
}
}

type indexTestClient struct {
client *testClient
index uint64
errs chan error
}

func newIndexTestClient(c *testClient, errs chan error) *indexTestClient {
return &indexTestClient{
client: c,
index: 0,
errs: errs,
}
}

func (i *indexTestClient) Register(r *consul.AgentServiceRegistration) error {
return i.client.Register(r)
}

func (i *indexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
return i.client.Deregister(r)
}

func (i *indexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {

// Assumes this is the first call Service, loop hasn't begun running yet
if i.index == 0 && queryOpts.WaitIndex == 0 {
i.index = 100
entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
meta.LastIndex = i.index
return entries, meta, err
}

if queryOpts.WaitIndex < i.index {
i.errs <- fmt.Errorf("wait index %d is less than or equal to previous value", queryOpts.WaitIndex)
}

entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
i.index++
meta.LastIndex = i.index
return entries, meta, err
}

func TestInstancerLoopIndex(t *testing.T) {

var (
errs = make(chan error, 1)
logger = log.NewNopLogger()
client = newIndexTestClient(newTestClient(consulState), errs)
)

go func() {
for err := range errs {
t.Error(err)
t.FailNow()
}
}()

instancer := NewInstancer(client, logger, "search", []string{"api"}, true)
defer instancer.Stop()

time.Sleep(2 * time.Second)
}

0 comments on commit b285909

Please sign in to comment.