Skip to content

Commit

Permalink
Merge pull request #90277 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.2-90241

release-22.2: changefeedccl: flush to sink before initial_scan_only completion
  • Loading branch information
samiskin committed Oct 19, 2022
2 parents 3aeba77 + feb22af commit ed1d8f5
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 23 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
if err := ca.tick(); err != nil {
var e kvevent.ErrBufferClosed
if errors.As(err, &e) {
// ErrBufferClosed is a signal that
// our kvfeed has exited expectedly.
// ErrBufferClosed is a signal that our kvfeed has exited expectedly.
err = e.Unwrap()
if errors.Is(err, kvevent.ErrNormalRestartReason) {
err = nil
Expand Down
39 changes: 22 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6256,40 +6256,45 @@ func TestChangefeedOnlyInitialScan(t *testing.T) {

for testName, changefeedStmt := range initialScanOnlyTests {
t.Run(testName, func(t *testing.T) {
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo (a) SELECT * FROM generate_series(1, 5000);`)

feed := feed(t, f, changefeedStmt)

sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)")
sqlDB.Exec(t, "INSERT INTO foo VALUES (5005), (5007), (5009)")

seenMoreMessages := false
g := ctxgroup.WithContext(context.Background())
var expectedMessages []string
for i := 1; i <= 5000; i++ {
expectedMessages = append(expectedMessages, fmt.Sprintf(
`foo: [%d]->{"after": {"a": %d}}`, i, i,
))
}
var seenMessages []string
g.Go(func() error {
assertPayloads(t, feed, []string{
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})
for {
_, err := feed.Next()
m, err := feed.Next()
if err != nil {
return err
}
seenMoreMessages = true
seenMessages = append(seenMessages, fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value))
}
})
defer func() {
closeFeed(t, feed)
sqlDB.Exec(t, `DROP TABLE foo`)
_ = g.Wait()
require.False(t, seenMoreMessages)
}()

jobFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))

closeFeed(t, feed)
sqlDB.Exec(t, `DROP TABLE foo`)
_ = g.Wait()
require.Equal(t, len(expectedMessages), len(seenMessages))
sort.Strings(expectedMessages)
sort.Strings(seenMessages)
for i := range expectedMessages {
require.Equal(t, expectedMessages[i], seenMessages[i])
}
})
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/kvevent/chunked_event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (l *bufferEventChunkQueue) dequeue() (e Event, ok bool) {
}
l.head = l.head.next
freeBufferEventChunk(toFree)
return l.dequeue()
if !ok {
return l.dequeue()
}
}

if !ok {
Expand Down Expand Up @@ -117,7 +119,7 @@ func (bec *bufferEventChunk) pop() (e Event, ok bool, consumedAll bool) {

e = bec.events[bec.head]
bec.head++
return e, true, false
return e, true, bec.head == bufferEventChunkArrSize
}

func (bec *bufferEventChunk) empty() bool {
Expand Down
24 changes: 23 additions & 1 deletion pkg/ccl/changefeedccl/kvevent/chunked_event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,30 @@ func TestBufferEntryQueue(t *testing.T) {
assert.True(t, ok)
assert.True(t, q.empty())

// Add events to fill 5 chunks and assert they are consumed in fifo order.
// Fill 5 chunks and then pop each one, ensuring empty() returns the correct
// value each time.
eventCount := bufferEventChunkArrSize * 5
for i := 0; i < eventCount; i++ {
q.enqueue(Event{})
}
for {
assert.Equal(t, eventCount <= 0, q.empty())
_, ok = q.dequeue()
if !ok {
assert.True(t, q.empty())
break
} else {
eventCount--
}
}
assert.Equal(t, 0, eventCount)
q.enqueue(Event{})
assert.False(t, q.empty())
q.dequeue()
assert.True(t, q.empty())

// Add events to fill 5 chunks and assert they are consumed in fifo order.
eventCount = bufferEventChunkArrSize * 5
var lastPop int64 = -1
var lastPush int64 = -1

Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ func Run(ctx context.Context, cfg Config) error {
// Regardless of whether drain succeeds, we must also close the buffer to release
// any resources, and to let the consumer (changeAggregator) know that no more writes
// are expected so that it can transition to a draining state.
err = errors.CombineErrors(f.writer.Drain(ctx), f.writer.CloseWithReason(ctx, kvevent.ErrNormalRestartReason))
err = errors.CombineErrors(
f.writer.Drain(ctx),
f.writer.CloseWithReason(ctx, kvevent.ErrNormalRestartReason),
)

if err == nil {
// This context is canceled by the change aggregator when it receives
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,9 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) {
m.Resolved = nil
return m, nil
case changefeedbase.OptFormatCSV:
if isNew := c.markSeen(m); !isNew {
continue
}
return m, nil
default:
return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, v)
Expand Down

0 comments on commit ed1d8f5

Please sign in to comment.