From 5a8fc428bde736feb2bf8c43050799a4fe860510 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 16 Feb 2021 15:28:00 -0500 Subject: [PATCH] Merge pull request #9772 from hashicorp/streamin-fix-bad-cached-snapshot streaming: fix snapshot cache bug --- .changelog/9772.txt | 4 ++ agent/consul/stream/event_publisher.go | 35 +++++----- agent/consul/stream/event_publisher_test.go | 75 +++++++++++++++++++++ 3 files changed, 96 insertions(+), 18 deletions(-) create mode 100644 .changelog/9772.txt diff --git a/.changelog/9772.txt b/.changelog/9772.txt new file mode 100644 index 000000000000..6dd048f997a0 --- /dev/null +++ b/.changelog/9772.txt @@ -0,0 +1,4 @@ +```release-note:bug +streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients +to error until the cache expired. +``` diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 769e875d8373..db9ceb656470 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } snapFromCache := e.getCachedSnapshotLocked(req) - if req.Index == 0 && snapFromCache != nil { - return e.subscriptions.add(req, snapFromCache.First), nil + if snapFromCache == nil { + snap := newEventSnapshot() + snap.appendAndSplice(*req, handler, topicHead) + e.setCachedSnapshotLocked(req, snap) + snapFromCache = snap } - snap := newEventSnapshot() - // if the request has an Index the client view is stale and must be reset - // with a NewSnapshotToFollow event. - if req.Index > 0 { - snap.buffer.Append([]Event{{ - Topic: req.Topic, - Payload: newSnapshotToFollow{}, - }}) - - if snapFromCache != nil { - snap.buffer.AppendItem(snapFromCache.First) - return e.subscriptions.add(req, snap.First), nil - } + // If the request.Index is 0 the client has no view, send a full snapshot. + if req.Index == 0 { + return e.subscriptions.add(req, snapFromCache.First), nil } - snap.appendAndSplice(*req, handler, topicHead) - e.setCachedSnapshotLocked(req, snap) - return e.subscriptions.add(req, snap.First), nil + // otherwise the request has an Index, the client view is stale and must be reset + // with a NewSnapshotToFollow event. + result := newEventSnapshot() + result.buffer.Append([]Event{{ + Topic: req.Topic, + Payload: newSnapshotToFollow{}, + }}) + result.buffer.AppendItem(snapFromCache.First) + return e.subscriptions.add(req, result.First), nil } func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription { diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 576d4ccc3516..2967ef8d3da5 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin }) } +func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { + req := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + Index: 1, + } + + nextEvent := Event{ + Topic: testTopic, + Index: 3, + Payload: simplePayload{key: "sub-key", value: "event-3"}, + } + + handlers := SnapshotHandlers{ + testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{testSnapshotEvent}) + buf.Append([]Event{nextEvent}) + return 3, nil + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) + // Include the same events in the topicBuffer + publisher.publishEvent([]Event{testSnapshotEvent}) + publisher.publishEvent([]Event{nextEvent}) + + runStep(t, "start a subscription and unsub", func(t *testing.T) { + sub, err := publisher.Subscribe(req) + require.NoError(t, err) + defer sub.Unsubscribe() + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.True(t, next.IsNewSnapshotToFollow(), next) + + next = getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot(), next) + require.Equal(t, uint64(3), next.Index) + }) + + publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { + return 0, fmt.Errorf("error should not be seen, cache should have been used") + } + + runStep(t, "resume the subscription", func(t *testing.T) { + newReq := *req + newReq.Index = 0 + sub, err := publisher.Subscribe(&newReq) + require.NoError(t, err) + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + }) +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) {