Skip to content

Commit

Permalink
[2.8.1 Backport] - CBG-1252 Apply limit correctly during channel back…
Browse files Browse the repository at this point in the history
…fill (#4925)

Backports CBG-1246 to 2.8.1
  • Loading branch information
adamcfraser committed Jan 14, 2021
1 parent a5fea2d commit b1de96a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 9 deletions.
17 changes: 12 additions & 5 deletions db/changes.go
Expand Up @@ -218,6 +218,7 @@ func (db *Database) changesFeed(singleChannelCache SingleChannelCache, options C
base.DebugfCtx(db.Ctx, base.KeyChanges, "[changesFeed] Found %d changes for channel %q", len(changes), base.UD(singleChannelCache.ChannelName()))

// Now write each log entry to the 'feed' channel in turn:
sentChanges := 0
for _, logEntry := range changes {
if logEntry.Sequence >= options.Since.TriggeredBy {
options.Since.TriggeredBy = 0
Expand All @@ -228,24 +229,30 @@ func (db *Database) changesFeed(singleChannelCache SingleChannelCache, options C
}

change := makeChangeEntry(logEntry, seqID, singleChannelCache.ChannelName())
lastSeq = logEntry.Sequence

// Don't include deletes or removals during initial channel backfill
if options.Since.TriggeredBy > 0 && (change.Deleted || len(change.Removed) > 0) {
continue
}

base.DebugfCtx(db.Ctx, base.KeyChanges, "Channel feed processing seq:%v in channel %s %s", seqID, base.UD(singleChannelCache.ChannelName()), base.UD(to))
select {
case <-options.Terminator:
base.DebugfCtx(db.Ctx, base.KeyChanges, "Terminating channel feed %s", base.UD(to))
return
case feed <- &change:
lastSeq = logEntry.Sequence
sentChanges++
}
}

// If the query returned fewer results than the query limit, we're done
if len(changes) < queryLimit {
// If the query returned fewer results than the pagination limit, we're done
if len(changes) < paginationOptions.Limit {
return
}

// If we've reached the request limit, we're done
itemsSent += len(changes)
itemsSent += sentChanges
if requestLimit > 0 && itemsSent >= requestLimit {
return
}
Expand Down Expand Up @@ -650,7 +657,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
}
}

if options.ActiveOnly || minEntry.Seq.TriggeredBy > 0 {
if options.ActiveOnly {
if minEntry.Deleted || minEntry.allRemoved {
continue
}
Expand Down
68 changes: 64 additions & 4 deletions rest/changes_api_test.go
Expand Up @@ -648,8 +648,8 @@ func TestPostChangesAdminChannelGrantRemoval(t *testing.T) {
`{"seq":"28:4","id":"pbs-4","changes":[{"rev":"1-82214a562e80c8fa7b2361719847bc73"}]}`,
`{"seq":"28:5","id":"hbo-1","changes":[{"rev":"1-46f8c67c004681619052ee1a1cc8e104"}]}`,
`{"seq":"28:6","id":"hbo-2","changes":[{"rev":"1-46f8c67c004681619052ee1a1cc8e104"}]}`,
`{"seq":"28:15","id":"mix-1","removed":["HBO"],"changes":[{"rev":"2-0321dde33081a5ef566eecbe42ca3583"}]}`,
`{"seq":"28:16","id":"mix-2","removed":["PBS"],"changes":[{"rev":"2-5dcb551a0eb59eef3d98c64c29033d02"}]}`,
`{"seq":"28:15","id":"mix-1","changes":[{"rev":"2-0321dde33081a5ef566eecbe42ca3583"}]}`,
`{"seq":"28:16","id":"mix-2","changes":[{"rev":"2-5dcb551a0eb59eef3d98c64c29033d02"}]}`,
`{"seq":"28:22","id":"mix-5","changes":[{"rev":"3-8192afec7aa6986420be1d57f1677960"}]}`,
`{"seq":28,"id":"_user/bernard","changes":[]}`,
}
Expand Down Expand Up @@ -679,8 +679,8 @@ func TestPostChangesAdminChannelGrantRemoval(t *testing.T) {
// ensure we don't backfill from the start, but have everything from the compound sequence onwards
expectedResults = []string{
`{"seq":"28:6","id":"hbo-2","changes":[{"rev":"1-46f8c67c004681619052ee1a1cc8e104"}]}`,
`{"seq":"28:15","id":"mix-1","removed":["HBO"],"changes":[{"rev":"2-0321dde33081a5ef566eecbe42ca3583"}]}`,
`{"seq":"28:16","id":"mix-2","removed":["PBS"],"changes":[{"rev":"2-5dcb551a0eb59eef3d98c64c29033d02"}]}`,
`{"seq":"28:15","id":"mix-1","changes":[{"rev":"2-0321dde33081a5ef566eecbe42ca3583"}]}`,
`{"seq":"28:16","id":"mix-2","changes":[{"rev":"2-5dcb551a0eb59eef3d98c64c29033d02"}]}`,
`{"seq":"28:22","id":"mix-5","changes":[{"rev":"3-8192afec7aa6986420be1d57f1677960"}]}`,
`{"seq":28,"id":"_user/bernard","changes":[]}`,
`{"seq":29,"id":"pbs-5","changes":[{"rev":"1-82214a562e80c8fa7b2361719847bc73"}]}`,
Expand All @@ -703,6 +703,66 @@ func TestPostChangesAdminChannelGrantRemoval(t *testing.T) {
}
}

func TestPostChangesAdminChannelGrantRemovalWithLimit(t *testing.T) {
defer base.SetUpTestLogging(base.LevelInfo, base.KeyChanges, base.KeyHTTP)()

rt := NewRestTester(t, &RestTesterConfig{SyncFn: `function(doc) {channel(doc.channel);}`})
defer rt.Close()

// Create user with access to channel ABC:
a := rt.ServerContext().Database("db").Authenticator()
bernard, err := a.NewUser("bernard", "letmein", channels.SetOf(t, "ABC"))
assert.NoError(t, err)
assert.NoError(t, a.Save(bernard))

cacheWaiter := rt.GetDatabase().NewDCPCachingCountWaiter(t)

// Put several documents in channel PBS
pbs1 := rt.putDoc("pbs-1", `{"channel":["PBS"]}`)
pbs2 := rt.putDoc("pbs-2", `{"channel":["PBS"]}`)
pbs3 := rt.putDoc("pbs-3", `{"channel":["PBS"]}`)
pbs4 := rt.putDoc("pbs-4", `{"channel":["PBS"]}`)
cacheWaiter.AddAndWait(4)

// Mark the first four PBS docs as removals
_ = rt.putDoc("pbs-1", fmt.Sprintf(`{"_rev":%q}`, pbs1.Rev))
_ = rt.putDoc("pbs-2", fmt.Sprintf(`{"_rev":%q}`, pbs2.Rev))
_ = rt.putDoc("pbs-3", fmt.Sprintf(`{"_rev":%q}`, pbs3.Rev))
_ = rt.putDoc("pbs-4", fmt.Sprintf(`{"_rev":%q}`, pbs4.Rev))

cacheWaiter.AddAndWait(4)

// Add another pbs doc (with a higher sequence than the removals)
_ = rt.putDoc("pbs-5", `{"channel":["PBS"]}`)
cacheWaiter.AddAndWait(1)

// Grant user access to channel PBS
userResponse := rt.SendAdminRequest("PUT", "/db/_user/bernard", `{"admin_channels":["ABC","PBS"]}`)
assertStatus(t, userResponse, 200)

// Put several documents in channel ABC
_ = rt.putDoc("abc-1", `{"channel":["ABC"]}`)
_ = rt.putDoc("abc-2", `{"channel":["ABC"]}`)
_ = rt.putDoc("abc-3", `{"channel":["ABC"]}`)
cacheWaiter.AddAndWait(3)

// Issue changes request with limit less than 5. Expect to get pbs-5, user doc, and abc-1
changes, err := rt.WaitForChanges(0, "/db/_changes?limit=3", "bernard", false)
assert.NoError(t, err)
require.Equal(t, len(changes.Results), 3)
assert.Equal(t, "pbs-5", changes.Results[0].ID)
assert.Equal(t, "_user/bernard", changes.Results[1].ID)
assert.Equal(t, "abc-1", changes.Results[2].ID)
lastSeq := changes.Last_Seq

// Issue a second changes request, expect to see last 2 documents.
moreChanges, err := rt.WaitForChanges(0, fmt.Sprintf("/db/_changes?limit=3&since=%s", lastSeq), "bernard", false)
assert.NoError(t, err)
require.Equal(t, 2, len(moreChanges.Results))
assert.Equal(t, "abc-2", moreChanges.Results[0].ID)
assert.Equal(t, "abc-3", moreChanges.Results[1].ID)
}

// TestChangesFromCompoundSinceViaDocGrant ensures that a changes feed with a compound since value returns the correct result after a dynamic channel grant.
func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) {

Expand Down

0 comments on commit b1de96a

Please sign in to comment.