Skip to content

Commit

Permalink
tests: fix flaky TestAgent (#4203)
Browse files Browse the repository at this point in the history
* tests: fix flaky TestAgent

* Apply suggestions from code review

Co-authored-by: Jakub Warczarek <jakub.warczarek@konghq.com>

* tests: address review comments, cleanup

* tests: don't use assertions in goroutines in TestAgent

* tests: don't use assertions in goroutines in TestAgent

---------

Co-authored-by: Jakub Warczarek <jakub.warczarek@konghq.com>
  • Loading branch information
pmalek and programmer04 committed Jun 21, 2023
1 parent 31b1ba2 commit 11d1c90
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 34 deletions.
4 changes: 0 additions & 4 deletions internal/adminapi/backoff_strategy_konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ type Clock interface {
Now() time.Time
}

type SystemClock struct{}

func (SystemClock) Now() time.Time { return time.Now() }

// KonnectBackoffStrategy keeps track of Konnect config push backoffs.
//
// It takes into account:
Expand Down
3 changes: 2 additions & 1 deletion internal/adminapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
k8stypes "k8s.io/apimachinery/pkg/types"

"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
)

// Client is a wrapper around raw *kong.Client. It's advised to pass this wrapper across the codebase, and
Expand Down Expand Up @@ -61,7 +62,7 @@ func NewKonnectClient(c *kong.Client, runtimeGroup string) *KonnectClient {
konnectRuntimeGroup: runtimeGroup,
pluginSchemaStore: util.NewPluginSchemaStore(c),
},
backoffStrategy: NewKonnectBackoffStrategy(SystemClock{}),
backoffStrategy: NewKonnectBackoffStrategy(clock.System{}),
}
}

Expand Down
35 changes: 31 additions & 4 deletions internal/license/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/samber/mo"

"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
)

const (
Expand Down Expand Up @@ -52,6 +53,20 @@ func WithPollingPeriod(regularPollingPeriod time.Duration) AgentOpt {
}
}

type Ticker interface {
Stop()
Channel() <-chan time.Time
Reset(d time.Duration)
}

// WithTicker sets the ticker in Agent. This is useful for testing.
// Ticker doesn't define the period, it defines the implementation of ticking.
func WithTicker(t Ticker) AgentOpt {
return func(a *Agent) {
a.ticker = t
}
}

// NewAgent creates a new license agent that retrieves a license from the given url once every given period.
func NewAgent(
konnectLicenseClient KonnectLicenseClient,
Expand All @@ -63,6 +78,9 @@ func NewAgent(
konnectLicenseClient: konnectLicenseClient,
initialPollingPeriod: DefaultInitialPollingPeriod,
regularPollingPeriod: DefaultPollingPeriod,
// Note: the ticker defines the implementation of ticking, not the period.
ticker: clock.NewTicker(),
startedCh: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -78,6 +96,8 @@ type Agent struct {
konnectLicenseClient KonnectLicenseClient
initialPollingPeriod time.Duration
regularPollingPeriod time.Duration
ticker Ticker
startedCh chan struct{}

// cachedLicense is the current license retrieved from upstream. It's optional because we may not have retrieved a
// license yet.
Expand Down Expand Up @@ -122,21 +142,28 @@ func (a *Agent) GetLicense() mo.Option[kong.License] {
return mo.None[kong.License]()
}

// Started returns a channel which will be closed when the Agent has started.
func (a *Agent) Started() <-chan struct{} {
return a.startedCh
}

// runPollingLoop updates the license on a regular period until the context is cancelled.
// It will run at a faster period initially, and then switch to the regular period once a license is retrieved.
func (a *Agent) runPollingLoop(ctx context.Context) error {
ticker := time.NewTicker(a.resolvePollingPeriod())
defer ticker.Stop()
a.ticker.Reset(a.initialPollingPeriod)
defer a.ticker.Stop()

ch := a.ticker.Channel()
close(a.startedCh)
for {
select {
case <-ticker.C:
case <-ch:
a.logger.V(util.DebugLevel).Info("retrieving license from external service")
if err := a.reconcileLicenseWithKonnect(ctx); err != nil {
a.logger.Error(err, "could not reconcile license with Konnect")
}
// Reset the ticker to run with the expected period which may change after we receive the license.
ticker.Reset(a.resolvePollingPeriod())
a.ticker.Reset(a.resolvePollingPeriod())
case <-ctx.Done():
a.logger.Info("context done, shutting down license agent")
return ctx.Err()
Expand Down
88 changes: 63 additions & 25 deletions internal/license/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,34 @@ import (
"github.com/stretchr/testify/require"

"github.com/kong/kubernetes-ingress-controller/v2/internal/license"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
"github.com/kong/kubernetes-ingress-controller/v2/test/mocks"
)

type mockKonnectClientClient struct {
konnectLicense mo.Option[license.KonnectLicense]
err error
getCalls []time.Time
lock sync.RWMutex
clock Clock
}

func newMockKonnectLicenseClient(license mo.Option[license.KonnectLicense]) *mockKonnectClientClient {
return &mockKonnectClientClient{konnectLicense: license}
type Clock interface {
Now() time.Time
}

func newMockKonnectLicenseClient(license mo.Option[license.KonnectLicense], clock Clock) *mockKonnectClientClient {
return &mockKonnectClientClient{
konnectLicense: license,
clock: clock,
}
}

func (m *mockKonnectClientClient) Get(context.Context) (mo.Option[license.KonnectLicense], error) {
m.lock.Lock()
defer m.lock.Unlock()

m.getCalls = append(m.getCalls, time.Now())
m.getCalls = append(m.getCalls, m.clock.Now())

if m.err != nil {
return mo.None[license.KonnectLicense](), m.err
Expand Down Expand Up @@ -83,53 +93,60 @@ func TestAgent(t *testing.T) {
}
matchTime = time.Now()
return true
}, time.Second, time.Nanosecond)
}, time.Second, time.Millisecond)
return matchTime
}

t.Run("initial license is retrieved", func(t *testing.T) {
upstreamClient := newMockKonnectLicenseClient(mo.Some(expectedLicense))
upstreamClient := newMockKonnectLicenseClient(mo.Some(expectedLicense), clock.System{})
a := license.NewAgent(upstreamClient, logr.Discard())
go func() {
err := a.Start(ctx)
require.NoError(t, err)
}()
go a.Start(ctx) //nolint:errcheck
expectLicenseToMatchEventually(t, a, expectedLicense.Payload)
})

t.Run("initial license retrieval fails and recovers", func(t *testing.T) {
upstreamClient := newMockKonnectLicenseClient(mo.None[license.KonnectLicense]())
ticker := mocks.NewTicker()

upstreamClient := newMockKonnectLicenseClient(mo.None[license.KonnectLicense](), ticker)

// Return an error on the first call to List() to verify that the agent handles this correctly.
upstreamClient.ReturnError(errors.New("something went wrong on a backend"))

const (
// Set the initial polling period to a very short duration to ensure that the agent retries quickly.
initialPollingPeriod = time.Millisecond
regularPollingPeriod = time.Millisecond * 5
allowedDelta = time.Millisecond
initialPollingPeriod = time.Minute * 3
regularPollingPeriod = time.Minute * 20
allowedDelta = time.Second
)

a := license.NewAgent(
upstreamClient,
logr.Discard(),
license.WithInitialPollingPeriod(initialPollingPeriod),
license.WithPollingPeriod(regularPollingPeriod),
license.WithTicker(ticker),
)

startTime := time.Now()
go func() {
err := a.Start(ctx)
require.NoError(t, err)
}()
go a.Start(ctx) //nolint:errcheck

select {
case <-a.Started():
case <-time.After(time.Second):
require.FailNow(t, "timed out waiting for agent to start")
}

t.Run("initial polling period is used when no license is retrieved", func(t *testing.T) {
require.Eventually(t, func() bool {
return len(upstreamClient.GetCalls()) >= 1
}, time.Second, time.Nanosecond, "expected upstream client to be called at least once")

firstListCallTime := upstreamClient.GetCalls()[0]
require.WithinDuration(t, startTime.Add(initialPollingPeriod), firstListCallTime, allowedDelta,
"expected first call to List() to happen after the initial polling period")

require.WithinDuration(t, startTime, firstListCallTime, allowedDelta,
"expected first call to List() to happen immediately after starting the agent")

// Initial polling period has passed...
ticker.Add(initialPollingPeriod)

require.Eventually(t, func() bool {
return len(upstreamClient.GetCalls()) >= 2
Expand All @@ -145,24 +162,38 @@ func TestAgent(t *testing.T) {
t.Run("regular polling period is used after the initial license is retrieved", func(t *testing.T) {
// Now return a valid response to ensure that the agent recovers.
upstreamClient.ReturnSuccess(mo.Some(expectedLicense))

// Regular polling period has passed...
ticker.Add(regularPollingPeriod)

expectLicenseToMatchEventually(t, a, expectedLicense.Payload)

listCallsAfterMatchCount := len(upstreamClient.GetCalls())

// Regular polling period has passed...
ticker.Add(regularPollingPeriod)

require.Eventually(t, func() bool {
return len(upstreamClient.GetCalls()) > listCallsAfterMatchCount
}, time.Second, time.Nanosecond, "expected upstream client to be called at least once after the license is retrieved")
}, time.Second, time.Millisecond, "expected upstream client to be called at least once after the license is retrieved")

listCalls := upstreamClient.GetCalls()
lastListCall := listCalls[len(listCalls)-1]
lastButOneCall := listCalls[len(listCalls)-2]
require.WithinDuration(t, lastButOneCall.Add(regularPollingPeriod), lastListCall, allowedDelta)
require.Eventually(t, func() bool {
listCalls := upstreamClient.GetCalls()
lastListCall := listCalls[len(listCalls)-1]
lastButOneCall := listCalls[len(listCalls)-2]
return lastListCall.Sub(lastButOneCall).Abs() <= allowedDelta
}, time.Second, time.Millisecond)
})

t.Run("after the license is retrieved, errors returned from upstream do not override the license", func(t *testing.T) {
upstreamClient.ReturnError(errors.New("something went wrong on a backend"))

// Wait for the call to happen.
initialListCalls := len(upstreamClient.GetCalls())

// Regular polling period has passed...
ticker.Add(regularPollingPeriod)

require.Eventually(t, func() bool {
return len(upstreamClient.GetCalls()) > initialListCalls
}, time.Second, time.Nanosecond)
Expand All @@ -179,6 +210,9 @@ func TestAgent(t *testing.T) {

// Wait for the call to happen.
initialListCalls := len(upstreamClient.GetCalls())
// Regular polling period has passed...
ticker.Add(regularPollingPeriod)

require.Eventually(t, func() bool {
return len(upstreamClient.GetCalls()) > initialListCalls
}, time.Second, time.Nanosecond)
Expand All @@ -192,6 +226,10 @@ func TestAgent(t *testing.T) {
Payload: "new-license",
UpdatedAt: expectedLicense.UpdatedAt.Add(time.Second),
}))

// Regular polling period has passed...
ticker.Add(regularPollingPeriod)

expectLicenseToMatchEventually(t, a, "new-license")
})
})
Expand Down
7 changes: 7 additions & 0 deletions internal/util/clock/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package clock

import "time"

type System struct{}

func (System) Now() time.Time { return time.Now() }
34 changes: 34 additions & 0 deletions internal/util/clock/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package clock

import "time"

const (
// This is irrelevant for the ticker, but we need to pass something to NewTicker.
// The reason for this is that the ticker is used in the license agent, which
// uses a non trivial logic to determine the polling period based on the state
// of license retrieval.
// This might be changed in the future if it doesn't fit the future needs.
initialTickerDuration = 1000 * time.Hour
)

func NewTicker() *TimeTicker {
return &TimeTicker{
ticker: time.NewTicker(initialTickerDuration),
}
}

type TimeTicker struct {
ticker *time.Ticker
}

func (t *TimeTicker) Stop() {
t.ticker.Stop()
}

func (t *TimeTicker) Channel() <-chan time.Time {
return t.ticker.C
}

func (t *TimeTicker) Reset(d time.Duration) {
t.ticker.Reset(d)
}
Loading

0 comments on commit 11d1c90

Please sign in to comment.